From 53758c5ae7825a255dba6288df08e0f710e5945a Mon Sep 17 00:00:00 2001 From: Nishant Date: Fri, 13 Apr 2018 22:11:07 +0530 Subject: [PATCH] [HIVE-19173] Add Kafka indexing runtime information as part of DESCRIBE EXTENDED fix tests --- .../hadoop/hive/druid/DruidStorageHandler.java | 96 +++++++-- .../hadoop/hive/druid/DruidStorageHandlerInfo.java | 167 +++++++++++++++ .../hive/druid/json/KafkaSupervisorReport.java | 231 +++++++++++++++++++++ .../hadoop/hive/druid/json/TaskReportData.java | 125 +++++++++++ .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 7 +- .../org/apache/hadoop/hive/ql/metadata/Hive.java | 62 +++--- .../hive/ql/metadata/HiveStorageHandler.java | 17 +- .../hive/ql/metadata/StorageHandlerInfo.java | 38 ++++ .../metadata/formatting/JsonMetaDataFormatter.java | 6 +- .../ql/metadata/formatting/MetaDataFormatter.java | 4 +- .../metadata/formatting/TextMetaDataFormatter.java | 11 +- .../queries/clientpositive/druidkafkamini_basic.q | 6 +- .../druid/druidkafkamini_basic.q.out | 26 ++- 13 files changed, 734 insertions(+), 62 deletions(-) create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerInfo.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java create mode 100644 druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java index 76540b76b3..ce9f833655 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java @@ -17,25 +17,6 @@ */ package org.apache.hadoop.hive.druid; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.metamx.common.RetryUtils; -import com.metamx.common.lifecycle.Lifecycle; -import com.metamx.http.client.HttpClient; -import com.metamx.http.client.HttpClientConfig; -import com.metamx.http.client.HttpClientInit; -import com.metamx.http.client.Request; -import com.metamx.http.client.response.StatusResponseHandler; -import com.metamx.http.client.response.StatusResponseHolder; import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; @@ -59,6 +40,7 @@ import io.druid.storage.hdfs.HdfsDataSegmentPusher; import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig; import io.druid.timeline.DataSegment; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -69,6 +51,7 @@ import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat; import org.apache.hadoop.hive.druid.io.DruidRecordWriter; import org.apache.hadoop.hive.druid.json.KafkaSupervisorIOConfig; +import org.apache.hadoop.hive.druid.json.KafkaSupervisorReport; import org.apache.hadoop.hive.druid.json.KafkaSupervisorSpec; import org.apache.hadoop.hive.druid.json.KafkaSupervisorTuningConfig; import org.apache.hadoop.hive.druid.security.KerberosHttpClient; @@ -83,6 +66,7 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; @@ -96,6 +80,26 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.ShutdownHookManager; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.metamx.common.RetryUtils; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.StatusResponseHandler; +import com.metamx.http.client.response.StatusResponseHolder; + import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; @@ -452,7 +456,7 @@ private void resetKafkaIngestion(String overlordAddress, String dataSourceName) console.printInfo("Druid Kafka Ingestion Reset successful."); } else { throw new IOException(String - .format("Unable to stop Kafka Ingestion Druid status [%d] full response [%s]", + .format("Unable to reset Kafka Ingestion Druid status [%d] full response [%s]", response.getStatus().getCode(), response.getContent())); } } catch (Exception e) { @@ -484,7 +488,7 @@ private void stopKafkaIngestion(String overlordAddress, String dataSourceName) { } - public KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { + private KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { // Stop Kafka Ingestion first final String overlordAddress = Preconditions.checkNotNull(HiveConf .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS), @@ -510,7 +514,7 @@ public KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { return null; } else { throw new IOException(String - .format("Unable to stop Kafka Ingestion Druid status [%d] full response [%s]", + .format("Unable to fetch Kafka Ingestion Spec from Druid status [%d] full response [%s]", response.getStatus().getCode(), response.getContent())); } } catch (Exception e) { @@ -518,7 +522,39 @@ public KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) { } } - + private KafkaSupervisorReport fetchKafkaSupervisorReport(Table table) { + final String overlordAddress = Preconditions.checkNotNull(HiveConf + .getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS), + "Druid Overlord Address is null"); + String dataSourceName = Preconditions + .checkNotNull(getTableProperty(table, Constants.DRUID_DATA_SOURCE), + "Druid Datasource name is null"); + try { + StatusResponseHolder response = RetryUtils.retry(() -> getHttpClient().go(new Request(HttpMethod.GET, + new URL(String + .format("http://%s/druid/indexer/v1/supervisor/%s/status", overlordAddress, + dataSourceName))), + new StatusResponseHandler( + Charset.forName("UTF-8"))).get(), + input -> input instanceof IOException, + getMaxRetryCount()); + if (response.getStatus().equals(HttpResponseStatus.OK)) { + return DruidStorageHandlerUtils.JSON_MAPPER + .readValue(response.getContent(), KafkaSupervisorReport.class); + // Druid Returns 400 Bad Request when not found. + } else if (response.getStatus().equals(HttpResponseStatus.NOT_FOUND) || response.getStatus().equals(HttpResponseStatus.BAD_REQUEST)) { + LOG.debug("No Kafka Supervisor found for datasource[%s]", dataSourceName); + return null; + } else { + throw new IOException(String + .format("Unable to fetch Kafka Supervisor status [%d] full response [%s]", + response.getStatus().getCode(), response.getContent())); + } + } catch (Exception e) { + throw new RuntimeException("Exception while fetching kafka ingestion spec from druid", e); + } + } + protected void loadDruidSegments(Table table, boolean overwrite) throws MetaException { // at this point we have Druid segments from reducers but we need to atomically // rename and commit to metadata @@ -970,6 +1006,7 @@ public void preAlterTable(Table table, EnvironmentContext context) throws MetaEx updateKafkaIngestion(table); } } + private static Boolean getBooleanProperty(Table table, String propertyName) { String val = getTableProperty(table, propertyName); if (val == null) { @@ -1032,4 +1069,17 @@ private static boolean isKafkaStreamingTable(Table table){ private int getMaxRetryCount() { return HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES); } + + @Override + public StorageHandlerInfo getStorageHandlerInfo(Table table) throws MetaException { + if(isKafkaStreamingTable(table)){ + KafkaSupervisorReport kafkaSupervisorReport = fetchKafkaSupervisorReport(table); + return new DruidStorageHandlerInfo(kafkaSupervisorReport); + } + else + // Currently we do not expose any runtime info for non-streaming tables. + // In future extend this add more information regarding table status. + // e.g. Total size of segments in druid, loadstatus of table on historical nodes etc. + return null; + } } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerInfo.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerInfo.java new file mode 100644 index 0000000000..99bb28593e --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerInfo.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid; + +import io.druid.data.input.impl.DimensionSchema; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.JSONParseSpec; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.java.util.common.Pair; +import io.druid.metadata.MetadataStorageConnectorConfig; +import io.druid.metadata.MetadataStorageTablesConfig; +import io.druid.metadata.SQLMetadataConnector; +import io.druid.metadata.storage.derby.DerbyConnector; +import io.druid.metadata.storage.derby.DerbyMetadataStorage; +import io.druid.metadata.storage.mysql.MySQLConnector; +import io.druid.metadata.storage.postgresql.PostgreSQLConnector; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.IndexSpec; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.GranularitySpec; +import io.druid.segment.loading.DataSegmentPusher; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.storage.hdfs.HdfsDataSegmentPusher; +import io.druid.storage.hdfs.HdfsDataSegmentPusherConfig; +import io.druid.timeline.DataSegment; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.Constants; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.druid.io.DruidOutputFormat; +import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat; +import org.apache.hadoop.hive.druid.io.DruidRecordWriter; +import org.apache.hadoop.hive.druid.json.KafkaSupervisorIOConfig; +import org.apache.hadoop.hive.druid.json.KafkaSupervisorReport; +import org.apache.hadoop.hive.druid.json.KafkaSupervisorSpec; +import org.apache.hadoop.hive.druid.json.KafkaSupervisorTuningConfig; +import org.apache.hadoop.hive.druid.security.KerberosHttpClient; +import org.apache.hadoop.hive.druid.serde.DruidSerDe; +import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.ForeignKeyInfo; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider; +import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.common.util.ShutdownHookManager; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.metamx.common.RetryUtils; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.StatusResponseHandler; +import com.metamx.http.client.response.StatusResponseHolder; + +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.DateTime; +import org.joda.time.Period; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * DruidStorageHandlerInfo provides a runtime information for DruidStorageHandler. + */ +@SuppressWarnings("serial") +public class DruidStorageHandlerInfo implements StorageHandlerInfo { + + private final KafkaSupervisorReport kafkaSupervisorReport; + + DruidStorageHandlerInfo(KafkaSupervisorReport kafkaSupervisorReport) { + this.kafkaSupervisorReport = kafkaSupervisorReport; + } + + @Override + public String formatAsText() { + StringBuilder sb = new StringBuilder(); + sb.append("Druid Storage Handler Runtime Status for " + kafkaSupervisorReport.getId()); + sb.append("\n"); + sb.append("kafkaPartitions=" + kafkaSupervisorReport.getPayload().getPartitions()); + sb.append("\n"); + sb.append("activeTasks=" + kafkaSupervisorReport.getPayload().getActiveTasks()); + sb.append("\n"); + sb.append("publishingTasks=" + kafkaSupervisorReport.getPayload().getPublishingTasks()); + if (kafkaSupervisorReport.getPayload().getLatestOffsets() != null) { + sb.append("\n"); + sb.append("latestOffsets=" + kafkaSupervisorReport.getPayload().getLatestOffsets()); + } + if (kafkaSupervisorReport.getPayload().getMinimumLag() != null) { + sb.append("\n"); + sb.append("minimumLag=" + kafkaSupervisorReport.getPayload().getMinimumLag()); + } + if (kafkaSupervisorReport.getPayload().getAggregateLag() != null) { + sb.append("\n"); + sb.append("aggregateLag=" + kafkaSupervisorReport.getPayload().getAggregateLag()); + } + if (kafkaSupervisorReport.getPayload().getOffsetsLastUpdated() != null) { + sb.append("\n"); + sb.append("lastUpdateTime=" + kafkaSupervisorReport.getPayload().getOffsetsLastUpdated()); + } + return sb.toString(); + } +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java new file mode 100644 index 0000000000..5a6756ecbf --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.json; + +import io.druid.guice.annotations.Json; +import io.druid.indexing.overlord.supervisor.SupervisorReport; +import io.druid.java.util.common.IAE; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; + +import org.joda.time.DateTime; + +import java.util.List; +import java.util.Map; + +import javax.annotation.Nullable; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class KafkaSupervisorReport extends SupervisorReport +{ + public static class KafkaSupervisorReportPayload + { + private final String dataSource; + private final String topic; + private final Integer partitions; + private final Integer replicas; + private final Long durationSeconds; + private final List activeTasks; + private final List publishingTasks; + private final Map latestOffsets; + private final Map minimumLag; + private final Long aggregateLag; + private final DateTime offsetsLastUpdated; + + @JsonCreator + public KafkaSupervisorReportPayload( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("topic") String topic, + @JsonProperty("partitions") Integer partitions, + @JsonProperty("replicas") Integer replicas, + @JsonProperty("durationSeconds") Long durationSeconds, + @Nullable @JsonProperty("latestOffsets") Map latestOffsets, + @Nullable @JsonProperty("minimumLag") Map minimumLag, + @Nullable @JsonProperty("aggregateLag") Long aggregateLag, + @Nullable @JsonProperty("offsetsLastUpdated") DateTime offsetsLastUpdated + ) + { + this.dataSource = dataSource; + this.topic = topic; + this.partitions = partitions; + this.replicas = replicas; + this.durationSeconds = durationSeconds; + this.activeTasks = Lists.newArrayList(); + this.publishingTasks = Lists.newArrayList(); + this.latestOffsets = latestOffsets; + this.minimumLag = minimumLag; + this.aggregateLag = aggregateLag; + this.offsetsLastUpdated = offsetsLastUpdated; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public String getTopic() + { + return topic; + } + + @JsonProperty + public Integer getPartitions() + { + return partitions; + } + + @JsonProperty + public Integer getReplicas() + { + return replicas; + } + + @JsonProperty + public Long getDurationSeconds() + { + return durationSeconds; + } + + @JsonProperty + public List getActiveTasks() + { + return activeTasks; + } + + @JsonProperty + public List getPublishingTasks() + { + return publishingTasks; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map getLatestOffsets() + { + return latestOffsets; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map getMinimumLag() + { + return minimumLag; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Long getAggregateLag() + { + return aggregateLag; + } + + @JsonProperty + public DateTime getOffsetsLastUpdated() + { + return offsetsLastUpdated; + } + + @Override + public String toString() + { + return "{" + + "dataSource='" + dataSource + '\'' + + ", topic='" + topic + '\'' + + ", partitions=" + partitions + + ", replicas=" + replicas + + ", durationSeconds=" + durationSeconds + + ", active=" + activeTasks + + ", publishing=" + publishingTasks + + (latestOffsets != null ? ", latestOffsets=" + latestOffsets : "") + + (minimumLag != null ? ", minimumLag=" + minimumLag : "") + + (aggregateLag != null ? ", aggregateLag=" + aggregateLag : "") + + (offsetsLastUpdated != null ? ", offsetsLastUpdated=" + offsetsLastUpdated : "") + + '}'; + } + } + + private final KafkaSupervisorReportPayload payload; + + @JsonCreator + public KafkaSupervisorReport(@JsonProperty("id") String id, + @JsonProperty("generationTime")DateTime generationTime, + @JsonProperty("payload") KafkaSupervisorReportPayload payload){ + super(id, generationTime); + this.payload = payload; + } + + public KafkaSupervisorReport( + String dataSource, + DateTime generationTime, + String topic, + Integer partitions, + Integer replicas, + Long durationSeconds, + @Nullable Map latestOffsets, + @Nullable Map minimumLag, + @Nullable Long aggregateLag, + @Nullable DateTime offsetsLastUpdated + ) { + this(dataSource, generationTime, new KafkaSupervisorReportPayload( + dataSource, + topic, + partitions, + replicas, + durationSeconds, + latestOffsets, + minimumLag, + aggregateLag, + offsetsLastUpdated + )); + } + + @Override + public KafkaSupervisorReportPayload getPayload() + { + return payload; + } + + public void addTask(TaskReportData data) + { + if (data.getType().equals(TaskReportData.TaskType.ACTIVE)) { + payload.activeTasks.add(data); + } else if (data.getType().equals(TaskReportData.TaskType.PUBLISHING)) { + payload.publishingTasks.add(data); + } else { + throw new IAE("Unknown task type [%s]", data.getType().name()); + } + } + + @Override + public String toString() + { + return "{" + + "id='" + getId() + '\'' + + ", generationTime=" + getGenerationTime() + + ", payload=" + payload + + '}'; + } +} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java new file mode 100644 index 0000000000..94a3f7f2ef --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid.json; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +import org.joda.time.DateTime; + +import java.util.Map; + +import javax.annotation.Nullable; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class TaskReportData +{ + public enum TaskType + { + ACTIVE, PUBLISHING, UNKNOWN + } + + private final String id; + private final Map startingOffsets; + private final DateTime startTime; + private final Long remainingSeconds; + private final TaskType type; + private final Map currentOffsets; + private final Map lag; + + public TaskReportData( + String id, + @Nullable Map startingOffsets, + @Nullable Map currentOffsets, + DateTime startTime, + Long remainingSeconds, + TaskType type, + @Nullable Map lag + ) + { + this.id = id; + this.startingOffsets = startingOffsets; + this.currentOffsets = currentOffsets; + this.startTime = startTime; + this.remainingSeconds = remainingSeconds; + this.type = type; + this.lag = lag; + } + + @JsonProperty + public String getId() + { + return id; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map getStartingOffsets() + { + return startingOffsets; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map getCurrentOffsets() + { + return currentOffsets; + } + + @JsonProperty + public DateTime getStartTime() + { + return startTime; + } + + @JsonProperty + public Long getRemainingSeconds() + { + return remainingSeconds; + } + + @JsonProperty + public TaskType getType() + { + return type; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map getLag() + { + return lag; + } + + @Override + public String toString() + { + return "{" + + "id='" + id + '\'' + + (startingOffsets != null ? ", startingOffsets=" + startingOffsets : "") + + (currentOffsets != null ? ", currentOffsets=" + currentOffsets : "") + + ", startTime=" + startTime + + ", remainingSeconds=" + remainingSeconds + + (lag != null ? ", lag=" + lag : "") + + '}'; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index b9b1830a7b..85d3232529 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.exec; import static org.apache.commons.lang.StringUtils.join; -import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE; import java.io.BufferedWriter; @@ -73,7 +72,6 @@ import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.PartitionDropOptions; import org.apache.hadoop.hive.metastore.StatObjectConverter; import org.apache.hadoop.hive.metastore.TableType; @@ -161,6 +159,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo; +import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.UniqueConstraint; import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils; @@ -3627,6 +3626,7 @@ private int describeTable(Hive db, DescTableDesc descTbl) throws HiveException, NotNullConstraint nnInfo = null; DefaultConstraint dInfo = null; CheckConstraint cInfo = null; + StorageHandlerInfo storageHandlerInfo = null; if (descTbl.isExt() || descTbl.isFormatted()) { pkInfo = db.getPrimaryKeys(tbl.getDbName(), tbl.getTableName()); fkInfo = db.getForeignKeys(tbl.getDbName(), tbl.getTableName()); @@ -3634,6 +3634,7 @@ private int describeTable(Hive db, DescTableDesc descTbl) throws HiveException, nnInfo = db.getNotNullConstraints(tbl.getDbName(), tbl.getTableName()); dInfo = db.getDefaultConstraints(tbl.getDbName(), tbl.getTableName()); cInfo = db.getCheckConstraints(tbl.getDbName(), tbl.getTableName()); + storageHandlerInfo = db.getStorageHandlerInfo(tbl); } fixDecimalColumnTypeName(cols); // In case the query is served by HiveServer2, don't pad it with spaces, @@ -3642,7 +3643,7 @@ private int describeTable(Hive db, DescTableDesc descTbl) throws HiveException, formatter.describeTable(outStream, colPath, tableName, tbl, part, cols, descTbl.isFormatted(), descTbl.isExt(), isOutputPadded, colStats, - pkInfo, fkInfo, ukInfo, nnInfo, dInfo, cInfo); + pkInfo, fkInfo, ukInfo, nnInfo, dInfo, cInfo, storageHandlerInfo); LOG.debug("DDLTask: written data for {}", tableName); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 2dd1d35a12..cf01c0b2c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -57,6 +57,7 @@ import java.util.Set; import java.util.stream.Collectors; +import javax.annotation.Nullable; import javax.jdo.JDODataStoreException; import com.google.common.collect.ImmutableList; @@ -94,7 +95,6 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.hive.common.classification.InterfaceStability.Unstable; import org.apache.hadoop.hive.common.log.InPlaceUpdate; -import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.io.HdfsUtils; @@ -4121,29 +4121,14 @@ public static boolean isHadoop1() { private IMetaStoreClient createMetaStoreClient(boolean allowEmbedded) throws MetaException { HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() { - @Override - public HiveMetaHook getHook( - org.apache.hadoop.hive.metastore.api.Table tbl) - throws MetaException { - - try { - if (tbl == null) { - return null; - } - HiveStorageHandler storageHandler = - HiveUtils.getStorageHandler(conf, - tbl.getParameters().get(META_TABLE_STORAGE)); - if (storageHandler == null) { - return null; - } - return storageHandler.getMetaHook(); - } catch (HiveException ex) { - LOG.error(StringUtils.stringifyException(ex)); - throw new MetaException( - "Failed to load storage handler: " + ex.getMessage()); - } - } - }; + @Override + public HiveMetaHook getHook( + org.apache.hadoop.hive.metastore.api.Table tbl) + throws MetaException { + HiveStorageHandler storageHandler = createStorageHandler(tbl); + return storageHandler == null ? null : storageHandler.getMetaHook(); + } + }; if (conf.getBoolVar(ConfVars.METASTORE_FASTPATH)) { return new SessionHiveMetaStoreClient(conf, hookLoader, allowEmbedded); @@ -4153,6 +4138,22 @@ public HiveMetaHook getHook( } } + @Nullable + private HiveStorageHandler createStorageHandler(org.apache.hadoop.hive.metastore.api.Table tbl) throws MetaException { + try { + if (tbl == null) { + return null; + } + HiveStorageHandler storageHandler = + HiveUtils.getStorageHandler(conf, tbl.getParameters().get(META_TABLE_STORAGE)); + return storageHandler; + } catch (HiveException ex) { + LOG.error(StringUtils.stringifyException(ex)); + throw new MetaException( + "Failed to load storage handler: " + ex.getMessage()); + } + } + public static class SchemaException extends MetaException { private static final long serialVersionUID = 1L; public SchemaException(String message) { @@ -5070,4 +5071,15 @@ public void createOrDropTriggerToPoolMapping(String resourcePlanName, String tri throw new HiveException(e); } } -}; + + @Nullable + public StorageHandlerInfo getStorageHandlerInfo(Table table) + throws HiveException { + try { + HiveStorageHandler storageHandler = createStorageHandler(table.getTTable()); + return storageHandler == null ? null : storageHandler.getStorageHandlerInfo(table.getTTable()); + } catch (Exception e) { + throw new HiveException(e); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index 99bb9f6a04..1696243aeb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -18,12 +18,15 @@ package org.apache.hadoop.hive.ql.metadata; +import java.util.Collections; import java.util.Map; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; @@ -149,7 +152,19 @@ public void configureTableJobProperties( * Called just before submitting MapReduce job. * * @param tableDesc descriptor for the table being accessed - * @param JobConf jobConf for MapReduce job + * @param jobConf jobConf for MapReduce job */ public void configureJobConf(TableDesc tableDesc, JobConf jobConf); + + /** + * Used to fetch runtime information about storage handler during DESCRIBE EXTENDED statement + * + * @param table table definition + * @return StorageHandlerInfo containing runtime information about storage handler + * OR `null` if the storage handler choose to not provide any runtime information. + */ + public default StorageHandlerInfo getStorageHandlerInfo(Table table) throws MetaException + { + return null; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java new file mode 100644 index 0000000000..dbc44a6621 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/StorageHandlerInfo.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.metadata; + +import org.apache.hadoop.hive.metastore.api.SQLForeignKey; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * StorageHandlerInfo is a marker interface used to provide runtime information associated with a storage handler. + */ +public interface StorageHandlerInfo extends Serializable { + /** + * Called from Describe Extended Statement when Formatter is Text Formatter. + * @return Formatted StorageHandlerInfo as String + */ + String formatAsText(); +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java index cd70eee26c..c21967cb9c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/JsonMetaDataFormatter.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hive.ql.metadata.NotNullConstraint; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo; +import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.UniqueConstraint; import org.codehaus.jackson.JsonGenerator; @@ -117,7 +118,7 @@ public void describeTable(DataOutputStream out, String colPath, boolean isOutputPadded, List colStats, PrimaryKeyInfo pkInfo, ForeignKeyInfo fkInfo, UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, - CheckConstraint cInfo) throws HiveException { + CheckConstraint cInfo, StorageHandlerInfo storageHandlerInfo) throws HiveException { MapBuilder builder = MapBuilder.create(); builder.put("columns", makeColsUnformatted(cols)); @@ -146,6 +147,9 @@ public void describeTable(DataOutputStream out, String colPath, if (cInfo != null && !cInfo.getCheckConstraints().isEmpty()) { builder.put("checkConstraintInfo", cInfo); } + if(storageHandlerInfo != null) { + builder.put("storageHandlerInfo", storageHandlerInfo.toString()); + } } asJson(out, builder.build()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java index ed2cdd11b3..d15016c5d4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatter.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.metadata.NotNullConstraint; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo; +import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.UniqueConstraint; @@ -91,7 +92,8 @@ public void describeTable(DataOutputStream out, String colPath, boolean isFormatted, boolean isExt, boolean isOutputPadded, List colStats, PrimaryKeyInfo pkInfo, ForeignKeyInfo fkInfo, - UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, CheckConstraint cInfo) + UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, CheckConstraint cInfo, + StorageHandlerInfo storageHandlerInfo) throws HiveException; /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java index 63a2969846..2529923419 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java @@ -29,6 +29,7 @@ import java.util.Set; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo; import org.apache.hive.common.util.HiveStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,7 +133,8 @@ public void describeTable(DataOutputStream outStream, String colPath, boolean isFormatted, boolean isExt, boolean isOutputPadded, List colStats, PrimaryKeyInfo pkInfo, ForeignKeyInfo fkInfo, - UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, CheckConstraint cInfo) + UniqueConstraint ukInfo, NotNullConstraint nnInfo, DefaultConstraint dInfo, CheckConstraint cInfo, + StorageHandlerInfo storageHandlerInfo) throws HiveException { try { List partCols = tbl.isPartitioned() ? tbl.getPartCols() : null; @@ -252,6 +254,13 @@ public void describeTable(DataOutputStream outStream, String colPath, outStream.write(terminator); } } + + if (storageHandlerInfo!= null) { + outStream.write(("StorageHandlerInfo").getBytes("UTF-8")); + outStream.write(terminator); + outStream.write(storageHandlerInfo.formatAsText().getBytes("UTF-8")); + outStream.write(terminator); + } } } } catch (IOException e) { diff --git a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q index 38662e3d13..4026b043c3 100644 --- a/ql/src/test/queries/clientpositive/druidkafkamini_basic.q +++ b/ql/src/test/queries/clientpositive/druidkafkamini_basic.q @@ -8,7 +8,7 @@ CREATE TABLE druid_kafka_test(`__time` timestamp, page string, `user` string, la "druid.kafka.ingestion.useEarliestOffset" = "true", "druid.kafka.ingestion.maxRowsInMemory" = "5", "druid.kafka.ingestion.startDelay" = "PT1S", - "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.taskDuration" = "PT20S", "druid.kafka.ingestion.period" = "PT1S" ); @@ -17,7 +17,7 @@ ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'START' !curl -ss http://localhost:8081/druid/indexer/v1/supervisor; -- Sleep for some time for ingestion tasks to ingest events -!sleep 50; +!sleep 60; DESCRIBE druid_kafka_test; DESCRIBE EXTENDED druid_kafka_test; @@ -31,7 +31,7 @@ Select page FROM druid_kafka_test order by page; ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'RESET'); -- Sleep for some time for ingestion tasks to ingest events -!sleep 50; +!sleep 60; DESCRIBE druid_kafka_test; DESCRIBE EXTENDED druid_kafka_test; diff --git a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out index 73eab7b5a5..df47ba2021 100644 --- a/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out +++ b/ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out @@ -8,7 +8,7 @@ PREHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string, ` "druid.kafka.ingestion.useEarliestOffset" = "true", "druid.kafka.ingestion.maxRowsInMemory" = "5", "druid.kafka.ingestion.startDelay" = "PT1S", - "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.taskDuration" = "PT20S", "druid.kafka.ingestion.period" = "PT1S" ) PREHOOK: type: CREATETABLE @@ -24,7 +24,7 @@ POSTHOOK: query: CREATE TABLE druid_kafka_test(`__time` timestamp, page string, "druid.kafka.ingestion.useEarliestOffset" = "true", "druid.kafka.ingestion.maxRowsInMemory" = "5", "druid.kafka.ingestion.startDelay" = "PT1S", - "druid.kafka.ingestion.taskDuration" = "PT30S", + "druid.kafka.ingestion.taskDuration" = "PT20S", "druid.kafka.ingestion.period" = "PT1S" ) POSTHOOK: type: CREATETABLE @@ -65,6 +65,15 @@ added int from deserializer deleted int from deserializer #### A masked pattern was here #### +StorageHandlerInfo +Druid Storage Handler Runtime Status for default.druid_kafka_test +kafkaPartitions=1 +activeTasks=[] +publishingTasks=[] +latestOffsets={0=10} +minimumLag={} +aggregateLag=0 +#### A masked pattern was here #### PREHOOK: query: Select count(*) FROM druid_kafka_test PREHOOK: type: QUERY PREHOOK: Input: default@druid_kafka_test @@ -126,6 +135,15 @@ added int from deserializer deleted int from deserializer #### A masked pattern was here #### +StorageHandlerInfo +Druid Storage Handler Runtime Status for default.druid_kafka_test +kafkaPartitions=1 +activeTasks=[] +publishingTasks=[] +latestOffsets={0=10} +minimumLag={} +aggregateLag=0 +#### A masked pattern was here #### PREHOOK: query: Select count(*) FROM druid_kafka_test PREHOOK: type: QUERY PREHOOK: Input: default@druid_kafka_test @@ -328,7 +346,7 @@ STAGE PLANS: druid.kafka.ingestion.maxRowsInMemory 5 druid.kafka.ingestion.period PT1S druid.kafka.ingestion.startDelay PT1S - druid.kafka.ingestion.taskDuration PT30S + druid.kafka.ingestion.taskDuration PT20S druid.kafka.ingestion.useEarliestOffset true druid.query.granularity MINUTE druid.query.json {"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"} @@ -366,7 +384,7 @@ STAGE PLANS: druid.kafka.ingestion.maxRowsInMemory 5 druid.kafka.ingestion.period PT1S druid.kafka.ingestion.startDelay PT1S - druid.kafka.ingestion.taskDuration PT30S + druid.kafka.ingestion.taskDuration PT20S druid.kafka.ingestion.useEarliestOffset true druid.query.granularity MINUTE druid.query.json {"queryType":"scan","dataSource":"default.druid_kafka_test","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"filter":{"type":"not","field":{"type":"selector","dimension":"language","value":null}},"columns":["language","user"],"resultFormat":"compactedList"} -- 2.11.0 (Apple Git-81)