From a1f21938ed39d41ca444b7b4c2a77649ce02b648 Mon Sep 17 00:00:00 2001 From: Nishant Date: Wed, 28 Mar 2018 20:30:21 +0530 Subject: [PATCH] [HIVE-18976] Add ability to setup Druid Kafka Ingestion from Hive --- .../org/apache/hadoop/hive/conf/Constants.java | 7 + .../java/org/apache/hadoop/hive/conf/HiveConf.java | 3 + data/scripts/kafka_init_data.json | 10 + .../hadoop/hive/druid/DruidStorageHandler.java | 318 +++++++++++++- .../hive/druid/DruidStorageHandlerUtils.java | 148 ++++++- .../hadoop/hive/druid/io/DruidOutputFormat.java | 116 +---- .../hive/druid/json/KafkaSupervisorIOConfig.java | 247 +++++++++++ .../hive/druid/json/KafkaSupervisorSpec.java | 143 +++++++ .../druid/json/KafkaSupervisorTuningConfig.java | 200 +++++++++ .../hadoop/hive/druid/json/KafkaTuningConfig.java | 268 ++++++++++++ itests/qtest-druid/pom.xml | 11 + .../org/apache/hive/druid/MiniDruidCluster.java | 59 ++- .../apache/hive/kafka/SingleNodeKafkaCluster.java | 122 ++++++ .../hive/cli/TestMiniDruidKafkaCliDriver.java | 63 +++ .../apache/hadoop/hive/cli/control/CliConfigs.java | 23 + .../java/org/apache/hadoop/hive/ql/QTestUtil.java | 63 ++- pom.xml | 2 +- .../queries/clientpositive/druidkafkamini_basic.q | 73 ++++ .../druid/druidkafkamini_basic.q.out | 475 +++++++++++++++++++++ 19 files changed, 2195 insertions(+), 156 deletions(-) create mode 100644 data/scripts/kafka_init_data.json create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorIOConfig.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java create mode 100644 itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java create mode 100644 itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java create mode 100644 ql/src/test/queries/clientpositive/druidkafkamini_basic.q create mode 100644 ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java index 5535d69ff9..ff9eb59981 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -48,6 +48,13 @@ public static final String DRUID_SEGMENT_VERSION = "druid.segment.version"; public static final String DRUID_JOB_WORKING_DIRECTORY = "druid.job.workingDirectory"; + public static final String KAFKA_TOPIC = "kafka.topic"; + public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; + + public static final String DRUID_KAFKA_INGESTION_PROPERTY_PREFIX = "druid.kafka.ingestion."; + /* Kafka Ingestion state - valid values - START/STOP/RESET */ + public static final String DRUID_KAFKA_INGESTION = "druid.kafka.ingestion"; + public static final String HIVE_SERVER2_JOB_CREDSTORE_PASSWORD_ENVVAR = "HIVE_JOB_CREDSTORE_PASSWORD"; public static final String HADOOP_CREDENTIAL_PASSWORD_ENVVAR = "HADOOP_CREDSTORE_PASSWORD"; public static final String HADOOP_CREDENTIAL_PROVIDER_PATH_CONFIG = "hadoop.security.credential.provider.path"; diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 02367eb433..1fd824b29f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2091,6 +2091,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS("hive.druid.coordinator.address.default", "localhost:8081", "Address of the Druid coordinator. It is used to check the load status of newly created segments" ), + HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS("hive.druid.overlord.address.default", "localhost:8090", + "Address of the Druid overlord. It is used to submit indexing tasks to druid." + ), HIVE_DRUID_SELECT_THRESHOLD("hive.druid.select.threshold", 10000, "Takes only effect when hive.druid.select.distribute is set to false. \n" + "When we can split a Select query, this is the maximum number of rows that we try to retrieve\n" + diff --git a/data/scripts/kafka_init_data.json b/data/scripts/kafka_init_data.json new file mode 100644 index 0000000000..9e2c58cbdb --- /dev/null +++ b/data/scripts/kafka_init_data.json @@ -0,0 +1,10 @@ +{"__time": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143} +{"__time": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330} +{"__time": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111} +{"__time": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900} +{"__time": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9} +{"__time": "2013-09-01T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143} +{"__time": "2013-09-01T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330} +{"__time": "2013-09-01T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111} +{"__time": "2013-09-01T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900} +{"__time": "2013-09-01T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9} \ No newline at end of file diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index 0904bd16b0..6a0da13451 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -24,6 +24,7 @@ import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -32,6 +33,16 @@ import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClientConfig; import com.metamx.http.client.HttpClientInit; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.StatusResponseHandler; +import com.metamx.http.client.response.StatusResponseHolder; +import io.druid.data.input.impl.DimensionSchema; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.JSONParseSpec; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.java.util.common.Pair; import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.SQLMetadataConnector; @@ -39,11 +50,16 @@ import io.druid.metadata.storage.derby.DerbyMetadataStorage; import io.druid.metadata.storage.mysql.MySQLConnector; import io.druid.metadata.storage.postgresql.PostgreSQLConnector; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.IndexSpec; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.SegmentLoadingException; import io.druid.storage.hdfs.HdfsDataSegmentPusher; import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig; import io.druid.timeline.DataSegment; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -53,12 +69,16 @@ import org.apache.hadoop.hive.druid.io.DruidOutputFormat; import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat; import org.apache.hadoop.hive.druid.io.DruidRecordWriter; +import org.apache.hadoop.hive.druid.json.KafkaSupervisorIOConfig; +import org.apache.hadoop.hive.druid.json.KafkaSupervisorSpec; +import org.apache.hadoop.hive.druid.json.KafkaSupervisorTuningConfig; import org.apache.hadoop.hive.druid.security.KerberosHttpClient; import org.apache.hadoop.hive.druid.serde.DruidSerDe; import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; @@ -69,11 +89,16 @@ import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.ShutdownHookManager; + +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; import org.joda.time.Period; import org.skife.jdbi.v2.exceptions.CallbackFailedException; @@ -83,9 +108,12 @@ import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.nio.charset.Charset; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; @@ -254,9 +282,224 @@ public void commitCreateTable(Table table) throws MetaException { // For external tables, we do not need to do anything else return; } + if(isKafkaStreamingTable(table)){ + updateKafkaIngestion(table); + } loadDruidSegments(table, true); } + private void updateKafkaIngestion(Table table){ + final String overlordAddress = HiveConf + .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS); + + final String dataSourceName = Preconditions.checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE), "Druid datasource name is null"); + + final String kafkaTopic = Preconditions.checkNotNull(getTableProperty(table, Constants.KAFKA_TOPIC), "kafka topic is null"); + final String kafka_servers = Preconditions.checkNotNull(getTableProperty(table, Constants.KAFKA_BOOTSTRAP_SERVERS), "kafka connect string is null"); + + Properties tableProperties = new Properties(); + tableProperties.putAll(table.getParameters()); + + final GranularitySpec granularitySpec = DruidStorageHandlerUtils.getGranularitySpec(getConf(), tableProperties); + + List columns = table.getSd().getCols(); + List columnNames = columns.stream().map(fieldSchema -> fieldSchema.getName()) + .collect(Collectors.toList()); + + List columnTypes = columns.stream().map(fieldSchema -> TypeInfoUtils.getTypeInfoFromTypeString(fieldSchema.getType())).collect( + Collectors.toList()); + + Pair, AggregatorFactory[]> dimensionsAndAggregates = DruidStorageHandlerUtils + .getDimensionsAndAggregates(getConf(), columnNames, columnTypes); + if (!columnNames.contains(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { + throw new IllegalStateException( + "Timestamp column (' " + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN + + "') not specified in create table; list of columns is : " + + columnNames); + } + + final InputRowParser inputRowParser = new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, "auto", null), + new DimensionsSpec(dimensionsAndAggregates.lhs, null, null), + null, + null + ), "UTF-8"); + + Map inputParser = DruidStorageHandlerUtils.JSON_MAPPER + .convertValue(inputRowParser, Map.class); + final DataSchema dataSchema = new DataSchema( + dataSourceName, + inputParser, + dimensionsAndAggregates.rhs, + granularitySpec, + DruidStorageHandlerUtils.JSON_MAPPER + ); + + IndexSpec indexSpec = DruidStorageHandlerUtils.getIndexSpec(getConf()); + + KafkaSupervisorSpec spec = new KafkaSupervisorSpec(dataSchema, + new KafkaSupervisorTuningConfig( + getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsInMemory"), + getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxRowsPerSegment"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "intermediatePersistPeriod"), + null, // basePersistDirectory - use druid default, no need to be configured by user + getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "maxPendingPersists"), + indexSpec, + null, // buildV9Directly - use druid default, no need to be configured by user + getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "reportParseExceptions"), + getLongProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "handoffConditionTimeout"), + getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "resetOffsetAutomatically"), + getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "workerThreads"), + getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatThreads"), + getLongProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "chatRetries"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "httpTimeout"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "shutdownTimeout"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "offsetFetchPeriod")), + new KafkaSupervisorIOConfig(kafkaTopic, // Mandatory Property + getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "replicas"), + getIntegerProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskCount"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "taskDuration"), + ImmutableMap.of(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY, + kafka_servers), // Mandatory Property + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "startDelay"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "period"), + getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "useEarliestOffset"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "completionTimeout"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "lateMessageRejectionPeriod"), + getPeriodProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "earlyMessageRejectionPeriod"), + getBooleanProperty(table, Constants.DRUID_KAFKA_INGESTION_PROPERTY_PREFIX + "skipOffsetGaps")), + new HashMap() + ); + + // Fetch existing Ingestion Spec from Druid, if any + KafkaSupervisorSpec existingSpec = fetchKafkaIngestionSpec(table); + String targetState = getTableProperty(table, Constants.DRUID_KAFKA_INGESTION); + if(targetState == null){ + // Initial state of kafka ingestion, If user has not specified any state. + targetState = "STOP"; + } + // STOP Kafka Ingestion + if(targetState.equalsIgnoreCase("STOP")){ + if(existingSpec!= null){ + stopKafkaIngestion(overlordAddress, dataSourceName); + } + } else if(targetState.equalsIgnoreCase("START")){ + if(existingSpec == null || !existingSpec.equals(spec)){ + updateKafkaIngestionSpec(overlordAddress, spec); + } + } else if(targetState.equalsIgnoreCase("RESET")){ + // Case when there are changes in multiple table properties. + if(existingSpec != null && !existingSpec.equals(spec)){ + updateKafkaIngestionSpec(overlordAddress, spec); + } + resetKafkaIngestion(overlordAddress, dataSourceName); + table.getParameters().put(Constants.DRUID_KAFKA_INGESTION, "START"); + } else { + throw new IllegalArgumentException(String.format("Invalid value for property [%s], Valid values are [START, STOP, RESET]", Constants.DRUID_KAFKA_INGESTION)); + } + } + + private static void updateKafkaIngestionSpec(String overlordAddress, KafkaSupervisorSpec spec) { + try { + String task = DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(spec); + console.printInfo("submitting kafka Spec {}", task); + LOG.info("submitting kafka Supervisor Spec {}", task); + + StatusResponseHolder response = getHttpClient().go(new Request(HttpMethod.POST, + new URL(String.format("http://%s/druid/indexer/v1/supervisor", overlordAddress))) + .setContent( + "application/json", + DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsBytes(spec)), + new StatusResponseHandler( + Charset.forName("UTF-8"))).get(); + if (response.getStatus().equals(HttpResponseStatus.OK)) { + String msg = String.format("Kafka Supervisor for [%s] Submitted Successfully to druid.", spec.getDataSchema().getDataSource()); + LOG.info(msg); + console.printInfo(msg); + } else { + throw new IOException(String + .format("Unable to update Kafka Ingestion for Druid status [%d] full response [%s]", + response.getStatus().getCode(), response.getContent())); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static void resetKafkaIngestion(String overlordAddress, String dataSourceName) { + try { + StatusResponseHolder response = getHttpClient().go(new Request(HttpMethod.POST, + new URL(String + .format("http://%s/druid/indexer/v1/supervisor/%s/reset", overlordAddress, + dataSourceName))), + new StatusResponseHandler( + Charset.forName("UTF-8"))).get(); + if (response.getStatus().equals(HttpResponseStatus.OK)) { + console.printInfo("Druid Kafka Ingestion Reset successful."); + } else { + throw new IOException(String + .format("Unable to stop Kafka Ingestion Druid status [%d] full response [%s]", + response.getStatus().getCode(), response.getContent())); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static void stopKafkaIngestion(String overlordAddress, String dataSourceName) { + try { + StatusResponseHolder response = getHttpClient().go(new Request(HttpMethod.POST, + new URL(String + .format("http://%s/druid/indexer/v1/supervisor/%s/shutdown", overlordAddress, + dataSourceName))), + new StatusResponseHandler( + Charset.forName("UTF-8"))).get(); + if (response.getStatus().equals(HttpResponseStatus.OK)) { + console.printInfo("Druid Kafka Ingestion shutdown successful."); + } else { + throw new IOException(String + .format("Unable to stop Kafka Ingestion Druid status [%d] full response [%s]", + response.getStatus().getCode(), response.getContent())); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + + public KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { + // Stop Kafka Ingestion first + final String overlordAddress = Preconditions.checkNotNull(HiveConf + .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS), + "Druid Overlord Address is null"); + String dataSourceName = Preconditions + .checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE), + "Druid Datasource name is null"); + try { + StatusResponseHolder response = getHttpClient().go(new Request(HttpMethod.GET, + new URL(String + .format("http://%s/druid/indexer/v1/supervisor/%s", overlordAddress, + dataSourceName))), + new StatusResponseHandler( + Charset.forName("UTF-8"))).get(); + if (response.getStatus().equals(HttpResponseStatus.OK)) { + return DruidStorageHandlerUtils.JSON_MAPPER + .readValue(response.getContent(), KafkaSupervisorSpec.class); + // Druid Returns 400 Bad Request when not found. + } else if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND) || response.getStatus().equals(HttpResponseStatus.BAD_REQUEST)) { + LOG.debug("No Kafka Supervisor found for datasource[%s]", dataSourceName); + return null; + } else { + throw new IOException(String + .format("Unable to stop Kafka Ingestion Druid status [%d] full response [%s]", + response.getStatus().getCode(), response.getContent())); + } + } catch (Exception e) { + throw new RuntimeException("Exception while fetching kafka ingestion spec from druid", e); + } + } + protected void loadDruidSegments(Table table, boolean overwrite) throws MetaException { // at this point we have Druid segments from reducers but we need to atomically @@ -457,6 +700,17 @@ public void commitDropTable(Table table, boolean deleteData) throws MetaExceptio if (MetaStoreUtils.isExternalTable(table)) { return; } + if(isKafkaStreamingTable(table)) { + // Stop Kafka Ingestion first + final String overlordAddress = Preconditions.checkNotNull(HiveConf + .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS), + "Druid Overlord Address is null"); + String dataSourceName = Preconditions + .checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE), + "Druid Datasource name is null"); + + stopKafkaIngestion(overlordAddress, dataSourceName); + } String dataSourceName = Preconditions .checkNotNull(table.getParameters().get(Constants.DRUID_DATA_SOURCE), "DataSource name is null !" @@ -688,11 +942,73 @@ public static HttpClient getHttpClient() { @Override public void preAlterTable(Table table, EnvironmentContext context) throws MetaException { - String alterOpType = context == null ? null : context.getProperties().get(ALTER_TABLE_OPERATION_TYPE); + String alterOpType = + context == null ? null : context.getProperties().get(ALTER_TABLE_OPERATION_TYPE); // alterOpType is null in case of stats update if (alterOpType != null && !allowedAlterTypes.contains(alterOpType)) { throw new MetaException( "ALTER TABLE can not be used for " + alterOpType + " to a non-native table "); } + if(isKafkaStreamingTable(table)){ + updateKafkaIngestion(table); + } + } + private static Boolean getBooleanProperty(Table table, String propertyName) { + String val = getTableProperty(table, propertyName); + if (val == null) { + return null; + } + return Boolean.parseBoolean(val); + } + + private static Integer getIntegerProperty(Table table, String propertyName) { + String val = getTableProperty(table, propertyName); + if (val == null) { + return null; + } + try { + return Integer.parseInt(val); + } catch (NumberFormatException e) { + throw new NumberFormatException(String + .format("Exception while parsing property[%s] with Value [%s] as Integer", propertyName, + val)); + } + } + + private static Long getLongProperty(Table table, String propertyName) { + String val = getTableProperty(table, propertyName); + if (val == null) { + return null; + } + try { + return Long.parseLong(val); + } catch (NumberFormatException e) { + throw new NumberFormatException(String + .format("Exception while parsing property[%s] with Value [%s] as Long", propertyName, + val)); + } + } + + private static Period getPeriodProperty(Table table, String propertyName) { + String val = getTableProperty(table, propertyName); + if (val == null) { + return null; + } + try { + return Period.parse(val); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException(String + .format("Exception while parsing property[%s] with Value [%s] as Period", propertyName, + val)); + } + } + + private static String getTableProperty(Table table, String propertyName) { + return table.getParameters().get(propertyName); + } + + private static boolean isKafkaStreamingTable(Table table){ + // For kafka Streaming tables it is mandatory to set a kafka topic. + return getTableProperty(table, Constants.KAFKA_TOPIC) != null; } } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 233b288b4d..14242378ff 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -17,20 +17,11 @@ */ package org.apache.hadoop.hive.druid; -import com.fasterxml.jackson.databind.InjectableValues; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.dataformat.smile.SmileFactory; -import com.google.common.base.Throwables; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; -import com.metamx.common.JodaUtils; -import com.metamx.emitter.EmittingLogger; -import com.metamx.emitter.core.NoopEmitter; -import com.metamx.emitter.service.ServiceEmitter; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.response.InputStreamResponseHandler; +import io.druid.data.input.impl.DimensionSchema; +import io.druid.data.input.impl.StringDimensionSchema; import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.Pair; +import io.druid.java.util.common.granularity.Granularity; import io.druid.math.expr.ExprMacroTable; import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.SQLMetadataConnector; @@ -47,6 +38,14 @@ import io.druid.query.select.SelectQueryConfig; import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.segment.IndexSpec; +import io.druid.segment.data.ConciseBitmapSerdeFactory; +import io.druid.segment.data.RoaringBitmapSerdeFactory; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.storage.hdfs.HdfsDataSegmentPusher; @@ -64,33 +63,50 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.druid.serde.HiveDruidSerializationModule; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.util.StringUtils; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.io.CharStreams; +import com.metamx.common.JodaUtils; import com.metamx.common.MapUtils; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.core.NoopEmitter; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.http.client.HttpClient; import com.metamx.http.client.Request; +import com.metamx.http.client.response.InputStreamResponseHandler; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.chrono.ISOChronology; -import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.PreparedBatch; import org.skife.jdbi.v2.Query; import org.skife.jdbi.v2.ResultIterator; -import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.ByteArrayMapper; @@ -113,6 +129,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ExecutionException; @@ -169,7 +186,9 @@ new TrimExprMacro.BothTrimExprMacro(), new TrimExprMacro.LeftTrimExprMacro(), new TrimExprMacro.RightTrimExprMacro() - ))); + ))) + .addValue(ObjectMapper.class, JSON_MAPPER); + JSON_MAPPER.setInjectableValues(injectableValues); SMILE_MAPPER.setInjectableValues(injectableValues); HiveDruidSerializationModule hiveDruidSerializationModule = new HiveDruidSerializationModule(); @@ -177,6 +196,7 @@ SMILE_MAPPER.registerModule(hiveDruidSerializationModule); // Register the shard sub type to be used by the mapper JSON_MAPPER.registerSubtypes(new NamedType(LinearShardSpec.class, "linear")); + JSON_MAPPER.registerSubtypes(new NamedType(NumberedShardSpec.class, "numbered")); // set the timezone of the object mapper // THIS IS NOT WORKING workaround is to set it as part of java opts -Duser.timezone="UTC" JSON_MAPPER.setTimeZone(TimeZone.getTimeZone("UTC")); @@ -755,4 +775,100 @@ private static ShardSpec getNextPartitionShardSpec(ShardSpec shardSpec) { public static Path getPath(DataSegment dataSegment) { return new Path(String.valueOf(dataSegment.getLoadSpec().get("path"))); } + + public static GranularitySpec getGranularitySpec(Configuration configuration, Properties tableProperties) { + final String segmentGranularity = + tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) != null ? + tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) : + HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY); + return new UniformGranularitySpec( + Granularity.fromString(segmentGranularity), + Granularity.fromString( + tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY) == null + ? "NONE" + : tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY)), + null + ); + } + + public static IndexSpec getIndexSpec(Configuration jc) { + IndexSpec indexSpec; + if ("concise".equals(HiveConf.getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BITMAP_FACTORY_TYPE))) { + indexSpec = new IndexSpec(new ConciseBitmapSerdeFactory(), null, null, null); + } else { + indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null); + } + return indexSpec; + } + + public static Pair, AggregatorFactory[]> getDimensionsAndAggregates(Configuration jc, List columnNames, + List columnTypes) { + // Default, all columns that are not metrics or timestamp, are treated as dimensions + final List dimensions = new ArrayList<>(); + ImmutableList.Builder aggregatorFactoryBuilder = ImmutableList.builder(); + final boolean approximationAllowed = HiveConf + .getBoolVar(jc, HiveConf.ConfVars.HIVE_DRUID_APPROX_RESULT); + for (int i = 0; i < columnTypes.size(); i++) { + final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = ((PrimitiveTypeInfo) columnTypes + .get(i)).getPrimitiveCategory(); + AggregatorFactory af; + switch (primitiveCategory) { + case BYTE: + case SHORT: + case INT: + case LONG: + af = new LongSumAggregatorFactory(columnNames.get(i), columnNames.get(i)); + break; + case FLOAT: + case DOUBLE: + af = new DoubleSumAggregatorFactory(columnNames.get(i), columnNames.get(i)); + break; + case DECIMAL: + if (approximationAllowed) { + af = new DoubleSumAggregatorFactory(columnNames.get(i), columnNames.get(i)); + } else { + throw new UnsupportedOperationException( + String.format("Druid does not support decimal column type." + + "Either cast column [%s] to double or Enable Approximate Result for Druid by setting property [%s] to true", + columnNames.get(i), HiveConf.ConfVars.HIVE_DRUID_APPROX_RESULT.varname)); + } + break; + case TIMESTAMP: + // Granularity column + String tColumnName = columnNames.get(i); + if (!tColumnName.equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME) && + !tColumnName.equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { + throw new IllegalArgumentException( + "Dimension " + tColumnName + " does not have STRING type: " + + primitiveCategory); + } + continue; + case TIMESTAMPLOCALTZ: + // Druid timestamp column + String tLocalTZColumnName = columnNames.get(i); + if (!tLocalTZColumnName.equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { + throw new IllegalArgumentException( + "Dimension " + tLocalTZColumnName + " does not have STRING type: " + + primitiveCategory); + } + continue; + default: + // Dimension + String dColumnName = columnNames.get(i); + if (PrimitiveObjectInspectorUtils.getPrimitiveGrouping(primitiveCategory) != + PrimitiveObjectInspectorUtils.PrimitiveGrouping.STRING_GROUP + && primitiveCategory != PrimitiveObjectInspector.PrimitiveCategory.BOOLEAN) { + throw new IllegalArgumentException( + "Dimension " + dColumnName + " does not have STRING type: " + + primitiveCategory); + } + dimensions.add(new StringDimensionSchema(dColumnName)); + continue; + } + aggregatorFactoryBuilder.add(af); + } + ImmutableList aggregatorFactories = aggregatorFactoryBuilder.build(); + return Pair.of(dimensions, + aggregatorFactories.toArray(new AggregatorFactory[aggregatorFactories.size()])); + } } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java index b758efd6e2..15a08eb4d3 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidOutputFormat.java @@ -17,28 +17,18 @@ */ package org.apache.hadoop.hive.druid.io; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.MapInputRowParser; -import io.druid.data.input.impl.StringDimensionSchema; import io.druid.data.input.impl.TimeAndDimsParseSpec; import io.druid.data.input.impl.TimestampSpec; -import io.druid.java.util.common.granularity.Granularity; +import io.druid.java.util.common.Pair; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.aggregation.DoubleSumAggregatorFactory; -import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.segment.IndexSpec; -import io.druid.segment.data.ConciseBitmapSerdeFactory; -import io.druid.segment.data.RoaringBitmapSerdeFactory; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.granularity.GranularitySpec; -import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.realtime.plumber.CustomVersioningPolicy; import org.apache.commons.lang.StringUtils; @@ -51,10 +41,6 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveGrouping; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.Writable; @@ -62,6 +48,12 @@ import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.util.Progressable; +import static org.apache.hadoop.hive.druid.DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,8 +64,6 @@ import java.util.Map; import java.util.Properties; -import static org.apache.hadoop.hive.druid.DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME; - public class DruidOutputFormat implements HiveOutputFormat { protected static final Logger LOG = LoggerFactory.getLogger(DruidOutputFormat.class); @@ -88,10 +78,7 @@ Progressable progress ) throws IOException { - final String segmentGranularity = - tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) != null ? - tableProperties.getProperty(Constants.DRUID_SEGMENT_GRANULARITY) : - HiveConf.getVar(jc, HiveConf.ConfVars.HIVE_DRUID_INDEXING_GRANULARITY); + final int targetNumShardsPerGranularity = Integer.parseUnsignedInt( tableProperties.getProperty(Constants.DRUID_TARGET_SHARDS_PER_GRANULARITY, "0")); final int maxPartitionSize = targetNumShardsPerGranularity > 0 ? -1 : HiveConf @@ -104,14 +91,7 @@ : tableProperties.getProperty(Constants.DRUID_DATA_SOURCE); final String segmentDirectory = jc.get(Constants.DRUID_SEGMENT_INTERMEDIATE_DIRECTORY); - final GranularitySpec granularitySpec = new UniformGranularitySpec( - Granularity.fromString(segmentGranularity), - Granularity.fromString( - tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY) == null - ? "NONE" - : tableProperties.getProperty(Constants.DRUID_QUERY_GRANULARITY)), - null - ); + final GranularitySpec granularitySpec = DruidStorageHandlerUtils.getGranularitySpec(jc, tableProperties); final String columnNameProperty = tableProperties.getProperty(serdeConstants.LIST_COLUMNS); final String columnTypeProperty = tableProperties.getProperty(serdeConstants.LIST_COLUMN_TYPES); @@ -122,10 +102,7 @@ columnNameProperty, columnTypeProperty )); } - ArrayList columnNames = new ArrayList(); - for (String name : columnNameProperty.split(",")) { - columnNames.add(name); - } + ArrayList columnNames = Lists.newArrayList(columnNameProperty.split(",")); if (!columnNames.contains(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { throw new IllegalStateException("Timestamp column (' " + DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN + "') not specified in create table; list of columns is : " + @@ -133,69 +110,11 @@ } ArrayList columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); - final boolean approximationAllowed = HiveConf.getBoolVar(jc, HiveConf.ConfVars.HIVE_DRUID_APPROX_RESULT); - // Default, all columns that are not metrics or timestamp, are treated as dimensions - final List dimensions = new ArrayList<>(); - ImmutableList.Builder aggregatorFactoryBuilder = ImmutableList.builder(); - for (int i = 0; i < columnTypes.size(); i++) { - final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = ((PrimitiveTypeInfo) columnTypes - .get(i)).getPrimitiveCategory(); - AggregatorFactory af; - switch (primitiveCategory) { - case BYTE: - case SHORT: - case INT: - case LONG: - af = new LongSumAggregatorFactory(columnNames.get(i), columnNames.get(i)); - break; - case FLOAT: - case DOUBLE: - af = new DoubleSumAggregatorFactory(columnNames.get(i), columnNames.get(i)); - break; - case DECIMAL: - if (approximationAllowed) { - af = new DoubleSumAggregatorFactory(columnNames.get(i), columnNames.get(i)); - } else { - throw new UnsupportedOperationException( - String.format("Druid does not support decimal column type." + - "Either cast column [%s] to double or Enable Approximate Result for Druid by setting property [%s] to true", - columnNames.get(i), HiveConf.ConfVars.HIVE_DRUID_APPROX_RESULT.varname)); - } - break; - case TIMESTAMP: - // Granularity column - String tColumnName = columnNames.get(i); - if (!tColumnName.equals(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME)) { - throw new IOException("Dimension " + tColumnName + " does not have STRING type: " + - primitiveCategory); - } - continue; - case TIMESTAMPLOCALTZ: - // Druid timestamp column - String tLocalTZColumnName = columnNames.get(i); - if (!tLocalTZColumnName.equals(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN)) { - throw new IOException("Dimension " + tLocalTZColumnName + " does not have STRING type: " + - primitiveCategory); - } - continue; - default: - // Dimension - String dColumnName = columnNames.get(i); - if (PrimitiveObjectInspectorUtils.getPrimitiveGrouping(primitiveCategory) != - PrimitiveGrouping.STRING_GROUP - && primitiveCategory != PrimitiveObjectInspector.PrimitiveCategory.BOOLEAN) { - throw new IOException("Dimension " + dColumnName + " does not have STRING type: " + - primitiveCategory); - } - dimensions.add(new StringDimensionSchema(dColumnName)); - continue; - } - aggregatorFactoryBuilder.add(af); - } - List aggregatorFactories = aggregatorFactoryBuilder.build(); + Pair, AggregatorFactory[]> dimensionsAndAggregates = DruidStorageHandlerUtils + .getDimensionsAndAggregates(jc, columnNames, columnTypes); final InputRowParser inputRowParser = new MapInputRowParser(new TimeAndDimsParseSpec( new TimestampSpec(DruidStorageHandlerUtils.DEFAULT_TIMESTAMP_COLUMN, "auto", null), - new DimensionsSpec(dimensions, Lists + new DimensionsSpec(dimensionsAndAggregates.lhs, Lists .newArrayList(Constants.DRUID_TIMESTAMP_GRANULARITY_COL_NAME, Constants.DRUID_SHARD_KEY_COL_NAME ), null @@ -208,7 +127,7 @@ final DataSchema dataSchema = new DataSchema( Preconditions.checkNotNull(dataSource, "Data source name is null"), inputParser, - aggregatorFactories.toArray(new AggregatorFactory[aggregatorFactories.size()]), + dimensionsAndAggregates.rhs, granularitySpec, DruidStorageHandlerUtils.JSON_MAPPER ); @@ -222,12 +141,7 @@ } Integer maxRowInMemory = HiveConf.getIntVar(jc, HiveConf.ConfVars.HIVE_DRUID_MAX_ROW_IN_MEMORY); - IndexSpec indexSpec; - if ("concise".equals(HiveConf.getVar(jc, HiveConf.ConfVars.HIVE_DRUID_BITMAP_FACTORY_TYPE))) { - indexSpec = new IndexSpec(new ConciseBitmapSerdeFactory(), null, null, null); - } else { - indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null); - } + IndexSpec indexSpec = DruidStorageHandlerUtils.getIndexSpec(jc); RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(maxRowInMemory, null, null, diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorIOConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorIOConfig.java new file mode 100644 index 0000000000..425a5bbfc5 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorIOConfig.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.json; + +import io.druid.java.util.common.StringUtils; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +import org.joda.time.Duration; +import org.joda.time.Period; + +import java.util.Map; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class KafkaSupervisorIOConfig +{ + public static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers"; + + private final String topic; + private final Integer replicas; + private final Integer taskCount; + private final Duration taskDuration; + private final Map consumerProperties; + private final Duration startDelay; + private final Duration period; + private final boolean useEarliestOffset; + private final Duration completionTimeout; + private final Optional lateMessageRejectionPeriod; + private final Optional earlyMessageRejectionPeriod; + private final boolean skipOffsetGaps; + + @JsonCreator + public KafkaSupervisorIOConfig( + @JsonProperty("topic") String topic, + @JsonProperty("replicas") Integer replicas, + @JsonProperty("taskCount") Integer taskCount, + @JsonProperty("taskDuration") Period taskDuration, + @JsonProperty("consumerProperties") Map consumerProperties, + @JsonProperty("startDelay") Period startDelay, + @JsonProperty("period") Period period, + @JsonProperty("useEarliestOffset") Boolean useEarliestOffset, + @JsonProperty("completionTimeout") Period completionTimeout, + @JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod, + @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod, + @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps + ) + { + this.topic = Preconditions.checkNotNull(topic, "topic"); + this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); + Preconditions.checkNotNull( + consumerProperties.get(BOOTSTRAP_SERVERS_KEY), + StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY) + ); + + this.replicas = replicas != null ? replicas : 1; + this.taskCount = taskCount != null ? taskCount : 1; + this.taskDuration = defaultDuration(taskDuration, "PT1H"); + this.startDelay = defaultDuration(startDelay, "PT5S"); + this.period = defaultDuration(period, "PT30S"); + this.useEarliestOffset = useEarliestOffset != null ? useEarliestOffset : false; + this.completionTimeout = defaultDuration(completionTimeout, "PT30M"); + this.lateMessageRejectionPeriod = lateMessageRejectionPeriod == null + ? Optional.absent() + : Optional.of(lateMessageRejectionPeriod.toStandardDuration()); + this.earlyMessageRejectionPeriod = earlyMessageRejectionPeriod == null + ? Optional.absent() + : Optional.of(earlyMessageRejectionPeriod.toStandardDuration()); + this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : false; + } + + @JsonProperty + public String getTopic() + { + return topic; + } + + @JsonProperty + public Integer getReplicas() + { + return replicas; + } + + @JsonProperty + public Integer getTaskCount() + { + return taskCount; + } + + @JsonProperty + public Duration getTaskDuration() + { + return taskDuration; + } + + @JsonProperty + public Map getConsumerProperties() + { + return consumerProperties; + } + + @JsonProperty + public Duration getStartDelay() + { + return startDelay; + } + + @JsonProperty + public Duration getPeriod() + { + return period; + } + + @JsonProperty + public boolean isUseEarliestOffset() + { + return useEarliestOffset; + } + + @JsonProperty + public Duration getCompletionTimeout() + { + return completionTimeout; + } + + @JsonProperty + public Optional getEarlyMessageRejectionPeriod() + { + return earlyMessageRejectionPeriod; + } + + @JsonProperty + public Optional getLateMessageRejectionPeriod() + { + return lateMessageRejectionPeriod; + } + + @JsonProperty + public boolean isSkipOffsetGaps() + { + return skipOffsetGaps; + } + + @Override + public String toString() + { + return "KafkaSupervisorIOConfig{" + + "topic='" + topic + '\'' + + ", replicas=" + replicas + + ", taskCount=" + taskCount + + ", taskDuration=" + taskDuration + + ", consumerProperties=" + consumerProperties + + ", startDelay=" + startDelay + + ", period=" + period + + ", useEarliestOffset=" + useEarliestOffset + + ", completionTimeout=" + completionTimeout + + ", lateMessageRejectionPeriod=" + lateMessageRejectionPeriod + + ", skipOffsetGaps=" + skipOffsetGaps + + '}'; + } + + private static Duration defaultDuration(final Period period, final String theDefault) + { + return (period == null ? new Period(theDefault) : period).toStandardDuration(); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + KafkaSupervisorIOConfig that = (KafkaSupervisorIOConfig) o; + + if (useEarliestOffset != that.useEarliestOffset) + return false; + if (skipOffsetGaps != that.skipOffsetGaps) + return false; + if (topic != null ? !topic.equals(that.topic) : that.topic != null) + return false; + if (replicas != null ? !replicas.equals(that.replicas) : that.replicas != null) + return false; + if (taskCount != null ? !taskCount.equals(that.taskCount) : that.taskCount != null) + return false; + if (taskDuration != null ? !taskDuration.equals(that.taskDuration) : that.taskDuration != null) + return false; + if (consumerProperties != null ? + !consumerProperties.equals(that.consumerProperties) : + that.consumerProperties != null) + return false; + if (startDelay != null ? !startDelay.equals(that.startDelay) : that.startDelay != null) + return false; + if (period != null ? !period.equals(that.period) : that.period != null) + return false; + if (completionTimeout != null ? + !completionTimeout.equals(that.completionTimeout) : + that.completionTimeout != null) + return false; + if (lateMessageRejectionPeriod != null ? + !lateMessageRejectionPeriod.equals(that.lateMessageRejectionPeriod) : + that.lateMessageRejectionPeriod != null) + return false; + return earlyMessageRejectionPeriod != null ? + earlyMessageRejectionPeriod.equals(that.earlyMessageRejectionPeriod) : + that.earlyMessageRejectionPeriod == null; + } + + @Override + public int hashCode() { + int result = topic != null ? topic.hashCode() : 0; + result = 31 * result + (replicas != null ? replicas.hashCode() : 0); + result = 31 * result + (taskCount != null ? taskCount.hashCode() : 0); + result = 31 * result + (taskDuration != null ? taskDuration.hashCode() : 0); + result = 31 * result + (consumerProperties != null ? consumerProperties.hashCode() : 0); + result = 31 * result + (startDelay != null ? startDelay.hashCode() : 0); + result = 31 * result + (period != null ? period.hashCode() : 0); + result = 31 * result + (useEarliestOffset ? 1 : 0); + result = 31 * result + (completionTimeout != null ? completionTimeout.hashCode() : 0); + result = 31 * result + + (lateMessageRejectionPeriod != null ? lateMessageRejectionPeriod.hashCode() : 0); + result = 31 * result + + (earlyMessageRejectionPeriod != null ? earlyMessageRejectionPeriod.hashCode() : 0); + result = 31 * result + (skipOffsetGaps ? 1 : 0); + return result; + } +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java new file mode 100644 index 0000000000..081bc279b8 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorSpec.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import io.druid.segment.indexing.DataSchema; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.base.Preconditions; + +import java.util.Map; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + property = "type" +) +@JsonSubTypes({@JsonSubTypes.Type( + name = "kafka", + value = KafkaSupervisorSpec.class +)}) +public class KafkaSupervisorSpec +{ + private final DataSchema dataSchema; + private final KafkaSupervisorTuningConfig tuningConfig; + private final KafkaSupervisorIOConfig ioConfig; + private final Map context; + + @JsonCreator + public KafkaSupervisorSpec( + @JsonProperty("dataSchema") DataSchema dataSchema, + @JsonProperty("tuningConfig") KafkaSupervisorTuningConfig tuningConfig, + @JsonProperty("ioConfig") KafkaSupervisorIOConfig ioConfig, + @JsonProperty("context") Map context + ) + { + this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema"); + this.tuningConfig = tuningConfig != null + ? tuningConfig + : new KafkaSupervisorTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); + this.context = context; + } + + @JsonProperty + public DataSchema getDataSchema() + { + return dataSchema; + } + + @JsonProperty + public KafkaSupervisorTuningConfig getTuningConfig() + { + return tuningConfig; + } + + @JsonProperty + public KafkaSupervisorIOConfig getIoConfig() + { + return ioConfig; + } + + @JsonProperty + public Map getContext() + { + return context; + } + + @Override + public String toString() + { + return "KafkaSupervisorSpec{" + + "dataSchema=" + dataSchema + + ", tuningConfig=" + tuningConfig + + ", ioConfig=" + ioConfig + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + KafkaSupervisorSpec that = (KafkaSupervisorSpec) o; + + if (dataSchema != null ? !dataSchema.equals(that.dataSchema) : that.dataSchema != null) + return false; + if (tuningConfig != null ? !tuningConfig.equals(that.tuningConfig) : that.tuningConfig != null) + return false; + if (ioConfig != null ? !ioConfig.equals(that.ioConfig) : that.ioConfig != null) + return false; + return context != null ? context.equals(that.context) : that.context == null; + } + + @Override + public int hashCode() { + int result = dataSchema != null ? dataSchema.hashCode() : 0; + result = 31 * result + (tuningConfig != null ? tuningConfig.hashCode() : 0); + result = 31 * result + (ioConfig != null ? ioConfig.hashCode() : 0); + result = 31 * result + (context != null ? context.hashCode() : 0); + return result; + } +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java new file mode 100644 index 0000000000..a918df475d --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import io.druid.segment.IndexSpec; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import org.joda.time.Duration; +import org.joda.time.Period; + +import java.io.File; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + property = "type" +) +@JsonSubTypes({@JsonSubTypes.Type( + name = "kafka", + value = KafkaSupervisorTuningConfig.class +)}) +public class KafkaSupervisorTuningConfig extends KafkaTuningConfig +{ + private final Integer workerThreads; + private final Integer chatThreads; + private final Long chatRetries; + private final Duration httpTimeout; + private final Duration shutdownTimeout; + private final Duration offsetFetchPeriod; + + public KafkaSupervisorTuningConfig( + @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, + @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment, + @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, + @JsonProperty("basePersistDirectory") File basePersistDirectory, + @JsonProperty("maxPendingPersists") Integer maxPendingPersists, + @JsonProperty("indexSpec") IndexSpec indexSpec, + // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. + @JsonProperty("buildV9Directly") Boolean buildV9Directly, + @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, + @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, // for backward compatibility + @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically, + @JsonProperty("workerThreads") Integer workerThreads, + @JsonProperty("chatThreads") Integer chatThreads, + @JsonProperty("chatRetries") Long chatRetries, + @JsonProperty("httpTimeout") Period httpTimeout, + @JsonProperty("shutdownTimeout") Period shutdownTimeout, + @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod + ) + { + super( + maxRowsInMemory, + maxRowsPerSegment, + intermediatePersistPeriod, + basePersistDirectory, + maxPendingPersists, + indexSpec, + true, + reportParseExceptions, + // Supervised kafka tasks should respect KafkaSupervisorIOConfig.completionTimeout instead of + // handoffConditionTimeout + handoffConditionTimeout, + resetOffsetAutomatically + ); + + this.workerThreads = workerThreads; + this.chatThreads = chatThreads; + this.chatRetries = (chatRetries != null ? chatRetries : 8); + this.httpTimeout = defaultDuration(httpTimeout, "PT10S"); + this.shutdownTimeout = defaultDuration(shutdownTimeout, "PT80S"); + this.offsetFetchPeriod = defaultDuration(offsetFetchPeriod, "PT30S"); + } + + @JsonProperty + public Integer getWorkerThreads() + { + return workerThreads; + } + + @JsonProperty + public Integer getChatThreads() + { + return chatThreads; + } + + @JsonProperty + public Long getChatRetries() + { + return chatRetries; + } + + @JsonProperty + public Duration getHttpTimeout() + { + return httpTimeout; + } + + @JsonProperty + public Duration getShutdownTimeout() + { + return shutdownTimeout; + } + + @JsonProperty + public Duration getOffsetFetchPeriod() + { + return offsetFetchPeriod; + } + + @Override + public String toString() + { + return "KafkaSupervisorTuningConfig{" + + "maxRowsInMemory=" + getMaxRowsInMemory() + + ", maxRowsPerSegment=" + getMaxRowsPerSegment() + + ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() + + ", basePersistDirectory=" + getBasePersistDirectory() + + ", maxPendingPersists=" + getMaxPendingPersists() + + ", indexSpec=" + getIndexSpec() + + ", reportParseExceptions=" + isReportParseExceptions() + + ", handoffConditionTimeout=" + getHandoffConditionTimeout() + + ", resetOffsetAutomatically=" + isResetOffsetAutomatically() + + ", workerThreads=" + workerThreads + + ", chatThreads=" + chatThreads + + ", chatRetries=" + chatRetries + + ", httpTimeout=" + httpTimeout + + ", shutdownTimeout=" + shutdownTimeout + + ", offsetFetchPeriod=" + offsetFetchPeriod + + '}'; + } + + private static Duration defaultDuration(final Period period, final String theDefault) + { + return (period == null ? new Period(theDefault) : period).toStandardDuration(); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + if (!super.equals(o)) + return false; + + KafkaSupervisorTuningConfig that = (KafkaSupervisorTuningConfig) o; + + if (workerThreads != null ? + !workerThreads.equals(that.workerThreads) : + that.workerThreads != null) + return false; + if (chatThreads != null ? !chatThreads.equals(that.chatThreads) : that.chatThreads != null) + return false; + if (chatRetries != null ? !chatRetries.equals(that.chatRetries) : that.chatRetries != null) + return false; + if (httpTimeout != null ? !httpTimeout.equals(that.httpTimeout) : that.httpTimeout != null) + return false; + if (shutdownTimeout != null ? + !shutdownTimeout.equals(that.shutdownTimeout) : + that.shutdownTimeout != null) + return false; + return offsetFetchPeriod != null ? + offsetFetchPeriod.equals(that.offsetFetchPeriod) : + that.offsetFetchPeriod == null; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (workerThreads != null ? workerThreads.hashCode() : 0); + result = 31 * result + (chatThreads != null ? chatThreads.hashCode() : 0); + result = 31 * result + (chatRetries != null ? chatRetries.hashCode() : 0); + result = 31 * result + (httpTimeout != null ? httpTimeout.hashCode() : 0); + result = 31 * result + (shutdownTimeout != null ? shutdownTimeout.hashCode() : 0); + result = 31 * result + (offsetFetchPeriod != null ? offsetFetchPeriod.hashCode() : 0); + return result; + } +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java new file mode 100644 index 0000000000..ea23dddf59 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import io.druid.segment.IndexSpec; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.realtime.appenderator.AppenderatorConfig; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import org.joda.time.Period; + +import java.io.File; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class KafkaTuningConfig implements AppenderatorConfig +{ + private static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000; + private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false; + + private final int maxRowsInMemory; + private final int maxRowsPerSegment; + private final Period intermediatePersistPeriod; + private final File basePersistDirectory; + private final int maxPendingPersists; + private final IndexSpec indexSpec; + private final boolean reportParseExceptions; + @Deprecated + private final long handoffConditionTimeout; + private final boolean resetOffsetAutomatically; + + @JsonCreator + public KafkaTuningConfig( + @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, + @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment, + @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, + @JsonProperty("basePersistDirectory") File basePersistDirectory, + @JsonProperty("maxPendingPersists") Integer maxPendingPersists, + @JsonProperty("indexSpec") IndexSpec indexSpec, + // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. + @JsonProperty("buildV9Directly") Boolean buildV9Directly, + @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, + @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, + @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically + ) + { + // Cannot be a static because default basePersistDirectory is unique per-instance + final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory); + + this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory; + this.maxRowsPerSegment = maxRowsPerSegment == null ? DEFAULT_MAX_ROWS_PER_SEGMENT : maxRowsPerSegment; + this.intermediatePersistPeriod = intermediatePersistPeriod == null + ? defaults.getIntermediatePersistPeriod() + : intermediatePersistPeriod; + this.basePersistDirectory = defaults.getBasePersistDirectory(); + this.maxPendingPersists = maxPendingPersists == null ? defaults.getMaxPendingPersists() : maxPendingPersists; + this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec; + this.reportParseExceptions = reportParseExceptions == null + ? defaults.isReportParseExceptions() + : reportParseExceptions; + this.handoffConditionTimeout = handoffConditionTimeout == null + ? defaults.getHandoffConditionTimeout() + : handoffConditionTimeout; + this.resetOffsetAutomatically = resetOffsetAutomatically == null + ? DEFAULT_RESET_OFFSET_AUTOMATICALLY + : resetOffsetAutomatically; + } + + public static KafkaTuningConfig copyOf(KafkaTuningConfig config) + { + return new KafkaTuningConfig( + config.maxRowsInMemory, + config.maxRowsPerSegment, + config.intermediatePersistPeriod, + config.basePersistDirectory, + config.maxPendingPersists, + config.indexSpec, + true, + config.reportParseExceptions, + config.handoffConditionTimeout, + config.resetOffsetAutomatically + ); + } + + @Override + @JsonProperty + public int getMaxRowsInMemory() + { + return maxRowsInMemory; + } + + @JsonProperty + public int getMaxRowsPerSegment() + { + return maxRowsPerSegment; + } + + @Override + @JsonProperty + public Period getIntermediatePersistPeriod() + { + return intermediatePersistPeriod; + } + + @Override + @JsonProperty + public File getBasePersistDirectory() + { + return basePersistDirectory; + } + + @Override + @JsonProperty + public int getMaxPendingPersists() + { + return maxPendingPersists; + } + + @Override + @JsonProperty + public IndexSpec getIndexSpec() + { + return indexSpec; + } + + /** + * Always returns true, doesn't affect the version being built. + */ + @Deprecated + @JsonProperty + public boolean getBuildV9Directly() + { + return true; + } + + @Override + @JsonProperty + public boolean isReportParseExceptions() + { + return reportParseExceptions; + } + + @Deprecated + @JsonProperty + public long getHandoffConditionTimeout() + { + return handoffConditionTimeout; + } + + @JsonProperty + public boolean isResetOffsetAutomatically() + { + return resetOffsetAutomatically; + } + + public KafkaTuningConfig withBasePersistDirectory(File dir) + { + return new KafkaTuningConfig( + maxRowsInMemory, + maxRowsPerSegment, + intermediatePersistPeriod, + dir, + maxPendingPersists, + indexSpec, + true, + reportParseExceptions, + handoffConditionTimeout, + resetOffsetAutomatically + ); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + KafkaTuningConfig that = (KafkaTuningConfig) o; + + if (maxRowsInMemory != that.maxRowsInMemory) { + return false; + } + if (maxRowsPerSegment != that.maxRowsPerSegment) { + return false; + } + if (maxPendingPersists != that.maxPendingPersists) { + return false; + } + if (reportParseExceptions != that.reportParseExceptions) { + return false; + } + if (handoffConditionTimeout != that.handoffConditionTimeout) { + return false; + } + if (resetOffsetAutomatically != that.resetOffsetAutomatically) { + return false; + } + if (intermediatePersistPeriod != null + ? !intermediatePersistPeriod.equals(that.intermediatePersistPeriod) + : that.intermediatePersistPeriod != null) { + return false; + } + if (basePersistDirectory != null + ? !basePersistDirectory.equals(that.basePersistDirectory) + : that.basePersistDirectory != null) { + return false; + } + return indexSpec != null ? indexSpec.equals(that.indexSpec) : that.indexSpec == null; + + } + + @Override + public int hashCode() + { + int result = maxRowsInMemory; + result = 31 * result + maxRowsPerSegment; + result = 31 * result + (intermediatePersistPeriod != null ? intermediatePersistPeriod.hashCode() : 0); + result = 31 * result + (basePersistDirectory != null ? basePersistDirectory.hashCode() : 0); + result = 31 * result + maxPendingPersists; + result = 31 * result + (indexSpec != null ? indexSpec.hashCode() : 0); + result = 31 * result + (reportParseExceptions ? 1 : 0); + result = 31 * result + (int) (handoffConditionTimeout ^ (handoffConditionTimeout >>> 32)); + result = 31 * result + (resetOffsetAutomatically ? 1 : 0); + return result; + } + + @Override + public String toString() + { + return "KafkaTuningConfig{" + + "maxRowsInMemory=" + maxRowsInMemory + + ", maxRowsPerSegment=" + maxRowsPerSegment + + ", intermediatePersistPeriod=" + intermediatePersistPeriod + + ", basePersistDirectory=" + basePersistDirectory + + ", maxPendingPersists=" + maxPendingPersists + + ", indexSpec=" + indexSpec + + ", reportParseExceptions=" + reportParseExceptions + + ", handoffConditionTimeout=" + handoffConditionTimeout + + ", resetOffsetAutomatically=" + resetOffsetAutomatically + + '}'; + } +} diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml index 20b7ea9e3e..2e19ce5b4c 100644 --- a/itests/qtest-druid/pom.xml +++ b/itests/qtest-druid/pom.xml @@ -43,6 +43,7 @@ 10.11.1.1 16.0.1 4.1.0 + 0.10.2.0 @@ -104,6 +105,11 @@ + io.druid.extensions + druid-kafka-indexing-service + ${druid.version} + + org.apache.logging.log4j log4j-api ${log4j2.version} @@ -196,6 +202,11 @@ ${junit.version} test + + org.apache.kafka + kafka_2.11 + ${kafka.version} + diff --git a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java index 71259dc914..8e79d4cdd9 100644 --- a/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java +++ b/itests/qtest-druid/src/main/java/org/apache/hive/druid/MiniDruidCluster.java @@ -21,7 +21,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import org.apache.commons.io.FileUtils; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,12 +50,13 @@ ); private static final Map COMMON_DRUID_CONF = ImmutableMap.of( - "druid.metadata.storage.type", "derby" + "druid.metadata.storage.type", "derby", + "druid.storage.type", "hdfs", + "druid.processing.buffer.sizeBytes", "213870912", + "druid.processing.numThreads", "2" ); private static final Map COMMON_DRUID_HISTORICAL = ImmutableMap.of( - "druid.processing.buffer.sizeBytes", "213870912", - "druid.processing.numThreads", "2", "druid.server.maxSize", "130000000000" ); @@ -87,26 +87,13 @@ public MiniDruidCluster(String name) { } - public MiniDruidCluster(String name, String logDir, String dataDir, Integer zookeeperPort, String classpath) { + public MiniDruidCluster(String name, String logDir, String tmpDir, Integer zookeeperPort, String classpath) { super(name); - this.dataDirectory = new File(dataDir, "druid-data"); + this.dataDirectory = new File(tmpDir, "druid-data"); this.logDirectory = new File(logDir); - try { - if (dataDirectory.exists()) { - // need to clean data directory to ensure that there is no interference from old runs - // Cleaning is happening here to allow debugging in case of tests fail - // we don;t have to clean logs since it is an append mode - log.info("Cleaning the druid-data directory [{}]", dataDirectory.getAbsolutePath()); - FileUtils.deleteDirectory(dataDirectory); - } else { - log.info("Creating the druid-data directory [{}]", dataDirectory.getAbsolutePath()); - dataDirectory.mkdirs(); - } - } catch (IOException e) { - log.error("Failed to clean data directory"); - Throwables.propagate(e); - } + ensureCleanDirectory(dataDirectory); + String derbyURI = String .format("jdbc:derby://localhost:1527/%s/druid_derby/metadata.db;create=true", dataDirectory.getAbsolutePath() @@ -126,13 +113,14 @@ public MiniDruidCluster(String name, String logDir, String dataDir, Integer zook .put("druid.indexer.logs.directory", indexingLogDir) .put("druid.zk.service.host", "localhost:" + zookeeperPort) .put("druid.coordinator.startDelay", "PT1S") + .put("druid.indexer.runner", "local") + .put("druid.storage.storageDirectory", getDeepStorageDir()) .build(); Map historicalProperties = historicalMapBuilder.putAll(COMMON_DRUID_CONF) .putAll(COMMON_DRUID_HISTORICAL) .put("druid.zk.service.host", "localhost:" + zookeeperPort) .put("druid.segmentCache.locations", segmentsCache) .put("druid.storage.storageDirectory", getDeepStorageDir()) - .put("druid.storage.type", "hdfs") .build(); coordinator = new ForkingDruidNode("coordinator", classpath, coordinatorProperties, COORDINATOR_JVM_CONF, @@ -148,6 +136,24 @@ public MiniDruidCluster(String name, String logDir, String dataDir, Integer zook } + private static void ensureCleanDirectory(File dir){ + try { + if (dir.exists()) { + // need to clean data directory to ensure that there is no interference from old runs + // Cleaning is happening here to allow debugging in case of tests fail + // we don;t have to clean logs since it is an append mode + log.info("Cleaning the druid directory [{}]", dir.getAbsolutePath()); + FileUtils.deleteDirectory(dir); + } else { + log.info("Creating the druid directory [{}]", dir.getAbsolutePath()); + dir.mkdirs(); + } + } catch (IOException e) { + log.error("Failed to clean druid directory"); + Throwables.propagate(e); + } + } + @Override protected void serviceStart() throws Exception { druidNodes.stream().forEach(node -> { @@ -191,4 +197,13 @@ public String getMetadataURI() { public String getDeepStorageDir() { return dataDirectory.getAbsolutePath() + File.separator + "deep-storage"; } + + public String getCoordinatorURI(){ + return "localhost:8081"; + } + + public String getOverlordURI(){ + // Overlord and coordinator both run in same JVM. + return getCoordinatorURI(); + } } diff --git a/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java b/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java new file mode 100644 index 0000000000..d839fd2db4 --- /dev/null +++ b/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java @@ -0,0 +1,122 @@ +package org.apache.hive.kafka; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + +import org.apache.hadoop.service.AbstractService; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; + +import com.google.common.base.Throwables; +import com.google.common.io.Files; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Properties; + +/** + * This class has the hooks to start and stop single node kafka cluster. + * The kafka broker is started on port 9092 + */ +public class SingleNodeKafkaCluster extends AbstractService { + private static final Logger log = LoggerFactory.getLogger(SingleNodeKafkaCluster.class); + + private final KafkaServerStartable serverStartable; + private final String zkString; + + public SingleNodeKafkaCluster(String name, String logDir, Integer zkPort){ + super(name); + Properties properties = new Properties(); + this.zkString = String.format("localhost:%d", zkPort); + properties.setProperty("zookeeper.connect", zkString); + properties.setProperty("broker.id", String.valueOf(1)); + properties.setProperty("host.name", "localhost"); + properties.setProperty("port", Integer.toString(9092)); + properties.setProperty("log.dir", logDir); + properties.setProperty("log.flush.interval.messages", String.valueOf(1)); + properties.setProperty("offsets.topic.replication.factor", String.valueOf(1)); + properties.setProperty("offsets.topic.num.partitions", String.valueOf(1)); + properties.setProperty("transaction.state.log.replication.factor", String.valueOf(1)); + properties.setProperty("transaction.state.log.min.isr", String.valueOf(1)); + properties.setProperty("log.cleaner.dedupe.buffer.size", "1048577"); + + this.serverStartable = new KafkaServerStartable(KafkaConfig.fromProps(properties)); + } + + + @Override + protected void serviceStart() throws Exception { + serverStartable.startup(); + log.info("Kafka Server Started"); + + } + + @Override + protected void serviceStop() throws Exception { + log.info("Stopping Kafka Server"); + serverStartable.shutdown(); + log.info("Kafka Server Stopped"); + } + + /** + * Creates a topic and inserts data from the specified datafile. + * Each line in the datafile is sent to kafka as a single message. + * @param topicName + * @param datafile + */ + public void createTopicWithData(String topicName, File datafile){ + createTopic(topicName); + // set up kafka producer + Properties properties = new Properties(); + properties.put("bootstrap.servers", "localhost:9092"); + properties.put("acks", "1"); + properties.put("retries", "3"); + + try(KafkaProducer producer = new KafkaProducer<>( + properties, + new StringSerializer(), + new StringSerializer() + )){ + List events = Files.readLines(datafile, Charset.forName("UTF-8")); + for(String event : events){ + producer.send(new ProducerRecord(topicName, event)); + } + } catch (IOException e) { + Throwables.propagate(e); + } + + } + + public void createTopic(String topic) { + int sessionTimeoutMs = 1000; + ZkClient zkClient = new ZkClient( + this.zkString, sessionTimeoutMs, sessionTimeoutMs, + ZKStringSerializer$.MODULE$ + ); + ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkString, sessionTimeoutMs), false); + int numPartitions = 1; + int replicationFactor = 1; + Properties topicConfig = new Properties(); + AdminUtils.createTopic( + zkUtils, + topic, + numPartitions, + replicationFactor, + topicConfig, + RackAwareMode.Disabled$.MODULE$ + ); + } + +} diff --git a/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java new file mode 100644 index 0000000000..4768975225 --- /dev/null +++ b/itests/qtest/src/test/java/org/apache/hadoop/hive/cli/TestMiniDruidKafkaCliDriver.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cli; + +import org.apache.hadoop.hive.cli.control.CliAdapter; +import org.apache.hadoop.hive.cli.control.CliConfigs; + +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.File; +import java.util.List; + +@RunWith(Parameterized.class) +public class TestMiniDruidKafkaCliDriver { + + static CliAdapter adapter = new CliConfigs.MiniDruidKafkaCliConfig().getCliAdapter(); + + @Parameters(name = "{0}") + public static List getParameters() throws Exception { + return adapter.getParameters(); + } + + @ClassRule + public static TestRule cliClassRule = adapter.buildClassRule(); + + @Rule + public TestRule cliTestRule = adapter.buildTestRule(); + + private String name; + private File qfile; + + public TestMiniDruidKafkaCliDriver(String name, File qfile) { + this.name = name; + this.qfile = qfile; + } + + @Test + public void testCliDriver() throws Exception { + adapter.runTest(name, qfile); + } + +} diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java index 7034c38b90..1e65569a3c 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java @@ -187,6 +187,29 @@ public MiniDruidCliConfig() { } } + public static class MiniDruidKafkaCliConfig extends AbstractCliConfig { + public MiniDruidKafkaCliConfig() { + super(CoreCliDriver.class); + try { + setQueryDir("ql/src/test/queries/clientpositive"); + + includesFrom(testConfigProps, "druid.kafka.query.files"); + + setResultsDir("ql/src/test/results/clientpositive/druid"); + setLogDir("itests/qtest/target/tmp/log"); + + setInitScript("q_test_druid_init.sql"); + setCleanupScript("q_test_cleanup_druid.sql"); + setHiveConfDir("data/conf/llap"); + setClusterType(MiniClusterType.druidKafka); + setMetastoreType(MetastoreType.sql); + setFsType(QTestUtil.FsType.hdfs); + } catch (Exception e) { + throw new RuntimeException("can't construct cliconfig", e); + } + } + } + public static class MiniLlapLocalCliConfig extends AbstractCliConfig { public MiniLlapLocalCliConfig() { diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 050f9d5765..3cdad284ef 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -67,6 +67,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; +import junit.framework.TestSuite; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.io.IOUtils; @@ -87,12 +88,11 @@ import org.apache.hadoop.hive.common.io.SortPrintStream; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hive.druid.MiniDruidCluster; import org.apache.hadoop.hive.llap.LlapItUtils; import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Task; @@ -123,20 +123,23 @@ import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.common.util.StreamPrinter; +import org.apache.hive.druid.MiniDruidCluster; +import org.apache.hive.kafka.SingleNodeKafkaCluster; import org.apache.logging.log4j.util.Strings; import org.apache.tools.ant.BuildException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; -import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import junit.framework.TestSuite; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * QTestUtil. @@ -203,6 +206,7 @@ private final String cleanupScript; private MiniDruidCluster druidCluster; + private SingleNodeKafkaCluster kafkaCluster; public interface SuiteAddTestFunctor { public void addTestToSuite(TestSuite suite, Object setup, String tName); @@ -402,6 +406,8 @@ public void initConf() throws Exception { conf.set("hive.druid.storage.storageDirectory", druidDeepStorage.toUri().getPath()); conf.set("hive.druid.metadata.db.type", "derby"); conf.set("hive.druid.metadata.uri", druidCluster.getMetadataURI()); + conf.set("hive.druid.coordinator.address.default", druidCluster.getCoordinatorURI()); + conf.set("hive.druid.overlord.address.default", druidCluster.getOverlordURI()); final Path scratchDir = fs .makeQualified(new Path(System.getProperty("test.tmp.dir"), "druidStagingDir")); fs.mkdirs(scratchDir); @@ -501,7 +507,9 @@ private void createRemoteDirs() { llap(CoreClusterType.TEZ, FsType.hdfs), llap_local(CoreClusterType.TEZ, FsType.local), none(CoreClusterType.MR, FsType.local), - druid(CoreClusterType.TEZ, FsType.hdfs); + druid(CoreClusterType.TEZ, FsType.hdfs), + druidKafka(CoreClusterType.TEZ, FsType.hdfs), + kafka(CoreClusterType.TEZ, FsType.hdfs); private final CoreClusterType coreClusterType; @@ -537,8 +545,11 @@ public static MiniClusterType valueForString(String type) { } else if (type.equals("llap_local")) { return llap_local; } else if (type.equals("druid")) { - return druid; - } else { + return druid; + } else if (type.equals("druid-kafka")) { + return druidKafka; + } + else { return none; } } @@ -630,11 +641,7 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType, ? new File(new File(dataDir).getAbsolutePath() + "/datasets") : new File(conf.get("test.data.set.files")); - // Use the current directory if it is not specified - String scriptsDir = conf.get("test.data.scripts"); - if (scriptsDir == null) { - scriptsDir = new File(".").getAbsolutePath() + "/data/scripts"; - } + String scriptsDir = getScriptsDir(); this.initScript = scriptsDir + File.separator + initScript; this.cleanupScript = scriptsDir + File.separator + cleanupScript; @@ -643,6 +650,14 @@ public QTestUtil(String outDir, String logDir, MiniClusterType clusterType, init(); } + private String getScriptsDir() { + // Use the current directory if it is not specified + String scriptsDir = conf.get("test.data.scripts"); + if (scriptsDir == null) { + scriptsDir = new File(".").getAbsolutePath() + "/data/scripts"; + } + return scriptsDir; + } private void setupFileSystem(HadoopShims shims) throws IOException { @@ -678,7 +693,7 @@ private void setupMiniCluster(HadoopShims shims, String confDir) throws String uriString = fs.getUri().toString(); - if (clusterType == MiniClusterType.druid) { + if (clusterType == MiniClusterType.druid || clusterType == MiniClusterType.druidKafka) { final String tempDir = System.getProperty("test.tmp.dir"); druidCluster = new MiniDruidCluster("mini-druid", getLogDirectory(), @@ -699,6 +714,19 @@ private void setupMiniCluster(HadoopShims shims, String confDir) throws druidCluster.start(); } + if(clusterType == MiniClusterType.kafka || clusterType == MiniClusterType.druidKafka) { + kafkaCluster = new SingleNodeKafkaCluster("kafka", + getLogDirectory() + "/kafka-cluster", + setup.zkPort + ); + kafkaCluster.init(conf); + kafkaCluster.start(); + kafkaCluster.createTopicWithData( + "test-topic", + new File(getScriptsDir(), "kafka_init_data.json") + ); + } + if (clusterType.getCoreClusterType() == CoreClusterType.TEZ) { if (confDir != null && !confDir.isEmpty()) { conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() @@ -735,6 +763,11 @@ public void shutdown() throws Exception { druidCluster.stop(); druidCluster = null; } + + if (kafkaCluster != null) { + kafkaCluster.stop(); + kafkaCluster = null; + } setup.tearDown(); if (sparkSession != null) { try { diff --git a/pom.xml b/pom.xml index 194a7d0bb1..8fc9a755ea 100644 --- a/pom.xml +++ b/pom.xml @@ -127,7 +127,7 @@ 4.1.19 3.2.0-m3 1.2 - 1.4 + 1.7 3.2.2 1.9 1.1 diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q new file mode 100644 index 0000000000..6b0c3c6455 --- /dev/null +++ b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q @@ -0,0 +1,73 @@ +CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `user` string, language string, added int, deleted int) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "test-topic", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion" = "START", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT1S" + ); + +!curl -ss http://localhost:8081/druid/indexer/v1/supervisor; + +-- Sleep for some time for ingestion tasks to ingest events +!sleep 50; + +DESCRIBE druid_kafka_test; +DESCRIBE EXTENDED druid_kafka_test; + +Select count(*) FROM druid_kafka_test; + +Select page FROM druid_kafka_test order by page; + +-- Reset kafka Ingestion, this would reset the offsets and since we are using useEarliestOffset, +-- We will see records duplicated after successful reset. +ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'RESET'); + +-- Sleep for some time for ingestion tasks to ingest events +!sleep 50; + +DESCRIBE druid_kafka_test; +DESCRIBE EXTENDED druid_kafka_test; + +Select count(*) FROM druid_kafka_test; + +Select page FROM druid_kafka_test order by page; + +-- Join against other normal tables +CREATE TABLE languages(shortname string, fullname string); + +INSERT INTO languages values +("en", "english"), +("ru", "russian"); + +EXPLAIN EXTENDED +SELECT a.fullname, b.`user` +FROM +( +(SELECT fullname, shortname +FROM languages) a +JOIN +(SELECT language, `user` +FROM druid_kafka_test) b + ON a.shortname = b.language +); + +SELECT a.fullname, b.`user` +FROM +( +(SELECT fullname, shortname +FROM languages) a +JOIN +(SELECT language, `user` +FROM druid_kafka_test) b + ON a.shortname = b.language +) order by b.`user`; + +DROP TABLE druid_kafka_test; +DROP TABLE druid_table_1; \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out new file mode 100644 index 0000000000..4bf0b1c551 --- /dev/null +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out @@ -0,0 +1,475 @@ +PREHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `user` string, language string, added int, deleted int) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "test-topic", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.state" = "START", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT1S" + ) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@druid_kafka_test +POSTHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `user` string, language string, added int, deleted int) + STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' + TBLPROPERTIES ( + "druid.segment.granularity" = "MONTH", + "druid.query.granularity" = "MINUTE", + "kafka.bootstrap.servers" = "localhost:9092", + "kafka.topic" = "test-topic", + "druid.kafka.ingestion.useEarliestOffset" = "true", + "druid.kafka.ingestion.state" = "START", + "druid.kafka.ingestion.maxRowsInMemory" = "5", + "druid.kafka.ingestion.startDelay" = "PT1S", + "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.period" = "PT1S" + ) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@druid_kafka_test +["default.druid_kafka_test"] +PREHOOK: query: DESCRIBE druid_kafka_test +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test +POSTHOOK: query: DESCRIBE druid_kafka_test +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +added int from deserializer +deleted int from deserializer +PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test +POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +added int from deserializer +deleted int from deserializer + +#### A masked pattern was here #### +PREHOOK: query: Select count(*) FROM druid_kafka_test +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM druid_kafka_test +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: Select page FROM druid_kafka_test order by page +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select page FROM druid_kafka_test order by page +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Output: hdfs://### HDFS PATH ### +Cherno Alpha +Cherno Alpha +Coyote Tango +Coyote Tango +Crimson Typhoon +Crimson Typhoon +Gypsy Danger +Gypsy Danger +Striker Eureka +Striker Eureka +PREHOOK: query: ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion.state' = 'RESET') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Output: default@druid_kafka_test +POSTHOOK: query: ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion.state' = 'RESET') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Output: default@druid_kafka_test +PREHOOK: query: DESCRIBE druid_kafka_test +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test +POSTHOOK: query: DESCRIBE druid_kafka_test +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +added int from deserializer +deleted int from deserializer +PREHOOK: query: DESCRIBE EXTENDED druid_kafka_test +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@druid_kafka_test +POSTHOOK: query: DESCRIBE EXTENDED druid_kafka_test +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@druid_kafka_test +__time timestamp from deserializer +page string from deserializer +user string from deserializer +language string from deserializer +added int from deserializer +deleted int from deserializer + +#### A masked pattern was here #### +PREHOOK: query: Select count(*) FROM druid_kafka_test +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select count(*) FROM druid_kafka_test +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: Select page FROM druid_kafka_test order by page +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: Select page FROM druid_kafka_test order by page +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Output: hdfs://### HDFS PATH ### +Cherno Alpha +Cherno Alpha +Coyote Tango +Coyote Tango +Crimson Typhoon +Crimson Typhoon +Gypsy Danger +Gypsy Danger +Striker Eureka +Striker Eureka +PREHOOK: query: CREATE TABLE languages(shortname string, fullname string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@languages +POSTHOOK: query: CREATE TABLE languages(shortname string, fullname string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@languages +PREHOOK: query: INSERT INTO languages values +("en", "english"), +("ru", "russian") +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@languages +POSTHOOK: query: INSERT INTO languages values +("en", "english"), +("ru", "russian") +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@languages +POSTHOOK: Lineage: languages.fullname SCRIPT [] +POSTHOOK: Lineage: languages.shortname SCRIPT [] +PREHOOK: query: EXPLAIN EXTENDED +SELECT a.fullname, b.`user` +FROM +( +(SELECT fullname, shortname +FROM languages) a +JOIN +(SELECT language, `user` +FROM druid_kafka_test) b + ON a.shortname = b.language +) +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN EXTENDED +SELECT a.fullname, b.`user` +FROM +( +(SELECT fullname, shortname +FROM languages) a +JOIN +(SELECT language, `user` +FROM druid_kafka_test) b + ON a.shortname = b.language +) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: languages + Statistics: Num rows: 2 Data size: 354 Basic stats: COMPLETE Column stats: COMPLETE + GatherStats: false + Filter Operator + isSamplingPred: false + predicate: shortname is not null (type: boolean) + Statistics: Num rows: 2 Data size: 354 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: fullname (type: string), shortname (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 354 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col1 (type: string) + null sort order: a + sort order: + + Map-reduce partition columns: _col1 (type: string) + Statistics: Num rows: 2 Data size: 354 Basic stats: COMPLETE Column stats: COMPLETE + tag: 0 + value expressions: _col0 (type: string) + auto parallelism: true + Path -> Alias: + hdfs://### HDFS PATH ### [languages] + Path -> Partition: + hdfs://### HDFS PATH ### + Partition + base file name: languages + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"fullname":"true","shortname":"true"}} + bucket_count -1 + column.name.delimiter , + columns shortname,fullname + columns.comments + columns.types string:string +#### A masked pattern was here #### + location hdfs://### HDFS PATH ### + name default.languages + numFiles 1 + numRows 2 + rawDataSize 20 + serialization.ddl struct languages { string shortname, string fullname} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 22 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + properties: + COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"fullname":"true","shortname":"true"}} + bucket_count -1 + column.name.delimiter , + columns shortname,fullname + columns.comments + columns.types string:string +#### A masked pattern was here #### + location hdfs://### HDFS PATH ### + name default.languages + numFiles 1 + numRows 2 + rawDataSize 20 + serialization.ddl struct languages { string shortname, string fullname} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + totalSize 22 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.languages + name: default.languages + Truncated Path -> Alias: + /languages [languages] + Map 3 + Map Operator Tree: + TableScan + alias: druid_kafka_test + properties: + druid.query.json {"queryType":"select","dataSource":"default.druid_kafka_test","descending":false,"intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"dimensions":["language","user"],"metrics":[],"granularity":"all","pagingSpec":{"threshold":16384,"fromNext":true},"context":{"druid.query.fetch":false}} + druid.query.type select + Statistics: Num rows: 1 Data size: 368 Basic stats: COMPLETE Column stats: NONE + GatherStats: false + Reduce Output Operator + key expressions: language (type: string) + null sort order: a + sort order: + + Map-reduce partition columns: language (type: string) + Statistics: Num rows: 1 Data size: 368 Basic stats: COMPLETE Column stats: NONE + tag: 1 + value expressions: user (type: string) + auto parallelism: true + Path -> Alias: + hdfs://### HDFS PATH ### [druid_kafka_test] + Path -> Partition: + hdfs://### HDFS PATH ### + Partition + base file name: druid_kafka_test + input format: org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat + output format: org.apache.hadoop.hive.druid.io.DruidOutputFormat + properties: + COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"__time":"true","added":"true","deleted":"true","language":"true","page":"true","user":"true"}} + bucket_count -1 + column.name.delimiter , + columns __time,page,user,language,added,deleted + columns.comments + columns.types timestamp:string:string:string:int:int + druid.datasource default.druid_kafka_test + druid.kafka.ingestion.maxRowsInMemory 5 + druid.kafka.ingestion.period PT1S + druid.kafka.ingestion.startDelay PT1S + druid.kafka.ingestion.state START + druid.kafka.ingestion.taskDuration PT30S + druid.kafka.ingestion.useEarliestOffset true + druid.query.granularity MINUTE + druid.query.json {"queryType":"select","dataSource":"default.druid_kafka_test","descending":false,"intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"dimensions":["language","user"],"metrics":[],"granularity":"all","pagingSpec":{"threshold":16384,"fromNext":true},"context":{"druid.query.fetch":false}} + druid.query.type select + druid.segment.granularity MONTH +#### A masked pattern was here #### + kafka.bootstrap.servers localhost:9092 + kafka.topic test-topic +#### A masked pattern was here #### + location hdfs://### HDFS PATH ### + name default.druid_kafka_test + numFiles 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct druid_kafka_test { timestamp __time, string page, string user, string language, i32 added, i32 deleted} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.druid.serde.DruidSerDe + storage_handler org.apache.hadoop.hive.druid.DruidStorageHandler + totalSize 0 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.druid.serde.DruidSerDe + + input format: org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat + output format: org.apache.hadoop.hive.druid.io.DruidOutputFormat + properties: + COLUMN_STATS_ACCURATE {"BASIC_STATS":"true","COLUMN_STATS":{"__time":"true","added":"true","deleted":"true","language":"true","page":"true","user":"true"}} + bucket_count -1 + column.name.delimiter , + columns __time,page,user,language,added,deleted + columns.comments + columns.types timestamp:string:string:string:int:int + druid.datasource default.druid_kafka_test + druid.kafka.ingestion.maxRowsInMemory 5 + druid.kafka.ingestion.period PT1S + druid.kafka.ingestion.startDelay PT1S + druid.kafka.ingestion.state START + druid.kafka.ingestion.taskDuration PT30S + druid.kafka.ingestion.useEarliestOffset true + druid.query.granularity MINUTE + druid.query.json {"queryType":"select","dataSource":"default.druid_kafka_test","descending":false,"intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"dimensions":["language","user"],"metrics":[],"granularity":"all","pagingSpec":{"threshold":16384,"fromNext":true},"context":{"druid.query.fetch":false}} + druid.query.type select + druid.segment.granularity MONTH +#### A masked pattern was here #### + kafka.bootstrap.servers localhost:9092 + kafka.topic test-topic +#### A masked pattern was here #### + location hdfs://### HDFS PATH ### + name default.druid_kafka_test + numFiles 0 + numRows 0 + rawDataSize 0 + serialization.ddl struct druid_kafka_test { timestamp __time, string page, string user, string language, i32 added, i32 deleted} + serialization.format 1 + serialization.lib org.apache.hadoop.hive.druid.serde.DruidSerDe + storage_handler org.apache.hadoop.hive.druid.DruidStorageHandler + totalSize 0 +#### A masked pattern was here #### + serde: org.apache.hadoop.hive.druid.serde.DruidSerDe + name: default.druid_kafka_test + name: default.druid_kafka_test + Truncated Path -> Alias: + /druid_kafka_test [druid_kafka_test] + Reducer 2 + Needs Tagging: false + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col1 (type: string) + 1 language (type: string) + outputColumnNames: _col0, _col4 + Position of Big Table: 0 + Statistics: Num rows: 2 Data size: 389 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col4 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 2 Data size: 389 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + GlobalTableId: 0 + directory: hdfs://### HDFS PATH ### + NumFilesPerFileSink: 1 + Statistics: Num rows: 2 Data size: 389 Basic stats: COMPLETE Column stats: NONE + Stats Publishing Key Prefix: hdfs://### HDFS PATH ### + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + properties: + columns _col0,_col1 + columns.types string:string + escape.delim \ + hive.serialization.extend.additional.nesting.levels true + serialization.escape.crlf true + serialization.format 1 + serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + TotalFiles: 1 + GatherStats: false + MultiFileSpray: false + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT a.fullname, b.`user` +FROM +( +(SELECT fullname, shortname +FROM languages) a +JOIN +(SELECT language, `user` +FROM druid_kafka_test) b + ON a.shortname = b.language +) order by b.`user` +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Input: default@languages +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: SELECT a.fullname, b.`user` +FROM +( +(SELECT fullname, shortname +FROM languages) a +JOIN +(SELECT language, `user` +FROM druid_kafka_test) b + ON a.shortname = b.language +) order by b.`user` +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Input: default@languages +POSTHOOK: Output: hdfs://### HDFS PATH ### +russian masterYi +russian masterYi +english nuclear +english nuclear +english speed +english speed +PREHOOK: query: DROP TABLE druid_kafka_test +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@druid_kafka_test +PREHOOK: Output: default@druid_kafka_test +POSTHOOK: query: DROP TABLE druid_kafka_test +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@druid_kafka_test +POSTHOOK: Output: default@druid_kafka_test +PREHOOK: query: DROP TABLE druid_table_1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE druid_table_1 +POSTHOOK: type: DROPTABLE -- 2.11.0 (Apple Git-81)