diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java index 6dc97d53b7..82ca5c9dfb 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandlerUtils.java @@ -672,7 +672,7 @@ static int getIntegerProperty(Table table, String propertyName, int defaultVal) } @Nullable public static List getListProperty(Table table, String propertyName) { - List rv = new ArrayList(); + List rv = new ArrayList<>(); String values = getTableProperty(table, propertyName); if(values == null) { return null; diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index c1e0e75f98..a6ef47e091 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -45,6 +45,8 @@ import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidWritable; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; @@ -60,7 +62,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.net.URL; @@ -75,11 +76,11 @@ * and parse the results. */ public class DruidQueryBasedInputFormat extends InputFormat - implements org.apache.hadoop.mapred.InputFormat { + implements org.apache.hadoop.mapred.InputFormat , VectorizedInputFormatInterface { protected static final Logger LOG = LoggerFactory.getLogger(DruidQueryBasedInputFormat.class); - @Nullable public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) { + public static DruidQueryRecordReader getDruidQueryReader(String druidQueryType) { switch (druidQueryType) { case Query.TIMESERIES: return new DruidTimeseriesQueryRecordReader(); @@ -92,7 +93,7 @@ case Query.SCAN: return new DruidScanQueryRecordReader(); default: - return null; + throw new IllegalStateException("Druid query type " + druidQueryType + " not recognized"); } } @@ -284,23 +285,18 @@ private static String withQueryId(String druidQuery, String queryId) throws IOEx @Override public org.apache.hadoop.mapred.RecordReader getRecordReader( org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter - ) - throws IOException { + ) throws IOException { // We need to provide a different record reader for every type of Druid query. // The reason is that Druid results format is different for each type. final DruidQueryRecordReader reader; - final String druidQueryType = job.get(Constants.DRUID_QUERY_TYPE); - if (druidQueryType == null) { - reader = new DruidScanQueryRecordReader(); // By default we use scan query as fallback. - reader.initialize((HiveDruidSplit) split, job); - return reader; - } - + // By default, we use druid scan query as fallback. + final String druidQueryType = job.get(Constants.DRUID_QUERY_TYPE, Query.SCAN); reader = getDruidQueryReader(druidQueryType); - if (reader == null) { - throw new IOException("Druid query type " + druidQueryType + " not recognized"); - } reader.initialize((HiveDruidSplit) split, job); + if (Utilities.getIsVectorized(job)) { + //noinspection unchecked + return (org.apache.hadoop.mapred.RecordReader) new DruidVectorizedWrapper(reader, job); + } return reader; } @@ -308,18 +304,15 @@ private static String withQueryId(String druidQuery, String queryId) throws IOEx public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException { + // By default, we use druid scan query as fallback. + final String druidQueryType = context.getConfiguration().get(Constants.DRUID_QUERY_TYPE, Query.SCAN); // We need to provide a different record reader for every type of Druid query. // The reason is that Druid results format is different for each type. - final String druidQueryType = context.getConfiguration().get(Constants.DRUID_QUERY_TYPE); - if (druidQueryType == null) { - return new DruidScanQueryRecordReader(); // By default, we use druid scan query as fallback. - } - final DruidQueryRecordReader reader = - getDruidQueryReader(druidQueryType); - if (reader == null) { - throw new IOException("Druid query type " + druidQueryType + " not recognized"); - } - return reader; + //noinspection unchecked + return getDruidQueryReader(druidQueryType); } + @Override public VectorizedSupport.Support[] getSupportedFeatures() { + return new VectorizedSupport.Support[0]; + } } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java new file mode 100644 index 0000000000..3ba0f2fb13 --- /dev/null +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.druid.io; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader; +import org.apache.hadoop.hive.druid.serde.DruidSerDe; +import org.apache.hadoop.hive.druid.serde.DruidWritable; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorAssignRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.RecordReader; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Properties; + +public class DruidVectorizedWrapper> implements RecordReader { + private final VectorAssignRow vectorAssignRow = new VectorAssignRow(); + private final DruidQueryRecordReader baseReader; + private final VectorizedRowBatchCtx rbCtx; + private final DruidSerDe serDe; + + /** + * actual projected columns needed by the query, this can be empty in case of query like: select count(*) from src; + */ + private final int[] projectedColumns; + + private final DruidWritable druidWritable; + + public DruidVectorizedWrapper(DruidQueryRecordReader reader, Configuration jobConf) { + this.rbCtx = Utilities.getVectorizedRowBatchCtx(jobConf); + if (rbCtx.getDataColumnNums() != null) { + projectedColumns = rbCtx.getDataColumnNums(); + } else { + // case all the columns are selected + projectedColumns = new int[rbCtx.getRowColumnTypeInfos().length]; + for (int i = 0; i < projectedColumns.length; i++) { + projectedColumns[i] = i; + } + } + this.serDe = createAndInitializeSerde(jobConf); + this.baseReader = Preconditions.checkNotNull(reader); + + // row parser and row assigner initializing + try { + vectorAssignRow.init((StructObjectInspector) serDe.getObjectInspector()); + } catch (HiveException e) { + throw new RuntimeException(e); + } + + druidWritable = baseReader.createValue(); + } + + @Override public boolean next(NullWritable nullWritable, VectorizedRowBatch vectorizedRowBatch) throws IOException { + vectorizedRowBatch.reset(); + ArrayList row; + int rowsCount = 0; + while (rowsCount < vectorizedRowBatch.getMaxSize() && baseReader.next(nullWritable, druidWritable)) { + if (projectedColumns.length > 0) { + try { + row = serDe.doDeserialize(druidWritable); + } catch (SerDeException e) { + throw new IOException(e); + } + for (int i : projectedColumns) { + vectorAssignRow.assignRowColumn(vectorizedRowBatch, rowsCount, i, row.get(i)); + } + } + rowsCount++; + } + vectorizedRowBatch.size = rowsCount; + return rowsCount > 0; + } + + @Override public NullWritable createKey() { + return NullWritable.get(); + } + + @Override public VectorizedRowBatch createValue() { + return rbCtx.createVectorizedRowBatch(); + } + + @Override public long getPos() throws IOException { + return baseReader.getPos(); + } + + @Override public void close() throws IOException { + baseReader.close(); + } + + @Override public float getProgress() throws IOException { + return baseReader.getProgress(); + } + + private static DruidSerDe createAndInitializeSerde(Configuration jobConf) { + DruidSerDe serDe = new DruidSerDe(); + MapWork mapWork = Preconditions.checkNotNull(Utilities.getMapWork(jobConf), "Map work is null"); + Properties + properties = + mapWork.getPartitionDescs() + .stream() + .map(partitionDesc -> partitionDesc.getTableDesc().getProperties()) + .findAny() + .orElseThrow(() -> new RuntimeException("Can not find table property at the map work")); + try { + serDe.initialize(jobConf, properties, null); + } catch (SerDeException e) { + throw new RuntimeException("Can not initialized the serde", e); + } + return serDe; + } +} diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java index 3a1dbf7229..5f1a87ffd5 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroBytesDecoder.java @@ -20,9 +20,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import org.apache.avro.generic.GenericRecord; -import java.nio.ByteBuffer; /** * This class is copied from druid source code diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java index d6e6624669..8c2067ca65 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java @@ -25,6 +25,7 @@ import io.druid.data.input.InputRow; import io.druid.data.input.impl.ParseSpec; +import javax.validation.constraints.NotNull; import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; @@ -48,7 +49,7 @@ public AvroStreamInputRowParser( this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder"); } - @Override + @NotNull @Override public List parseBatch(ByteBuffer input) { throw new UnsupportedOperationException("This class is only used for JSON serde"); diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorIOConfig.java druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorIOConfig.java index c1b3bf8d41..fba591792d 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorIOConfig.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorIOConfig.java @@ -43,8 +43,10 @@ private final Duration period; private final boolean useEarliestOffset; private final Duration completionTimeout; - @SuppressWarnings("OptionalUsedAsFieldOrParameterType") private final Optional lateMessageRejectionPeriod; - @SuppressWarnings("OptionalUsedAsFieldOrParameterType") private final Optional earlyMessageRejectionPeriod; + @SuppressWarnings({ "OptionalUsedAsFieldOrParameterType", "Guava" }) private final Optional + lateMessageRejectionPeriod; + @SuppressWarnings({ "OptionalUsedAsFieldOrParameterType", "Guava" }) private final Optional + earlyMessageRejectionPeriod; private final boolean skipOffsetGaps; @JsonCreator public KafkaSupervisorIOConfig(@JsonProperty("topic") String topic, @@ -71,10 +73,12 @@ this.period = defaultDuration(period, "PT30S"); this.useEarliestOffset = useEarliestOffset != null ? useEarliestOffset : false; this.completionTimeout = defaultDuration(completionTimeout, "PT30M"); + //noinspection Guava this.lateMessageRejectionPeriod = lateMessageRejectionPeriod == null ? Optional.absent() : Optional.of(lateMessageRejectionPeriod.toStandardDuration()); + //noinspection Guava this.earlyMessageRejectionPeriod = earlyMessageRejectionPeriod == null ? Optional.absent() : @@ -118,11 +122,11 @@ return completionTimeout; } - @JsonProperty public Optional getEarlyMessageRejectionPeriod() { + @SuppressWarnings("Guava") @JsonProperty public Optional getEarlyMessageRejectionPeriod() { return earlyMessageRejectionPeriod; } - @JsonProperty public Optional getLateMessageRejectionPeriod() { + @SuppressWarnings("Guava") @JsonProperty public Optional getLateMessageRejectionPeriod() { return lateMessageRejectionPeriod; } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java index 53d74417f8..daebdcfa2d 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java @@ -80,25 +80,23 @@ initialize(split, context.getConfiguration()); } - public void initialize(InputSplit split, - Configuration conf, - ObjectMapper mapper, - ObjectMapper smileMapper, - HttpClient httpClient) throws IOException { + public void initialize(InputSplit split, ObjectMapper mapper, ObjectMapper smileMapper, HttpClient httpClient) + throws IOException { HiveDruidSplit hiveDruidSplit = (HiveDruidSplit) split; Preconditions.checkNotNull(hiveDruidSplit, "input split is null ???"); - ObjectMapper mapper1 = Preconditions.checkNotNull(mapper, "object Mapper can not be null"); + Preconditions.checkNotNull(httpClient, "need Http Client can not be null"); + ObjectMapper objectMapper = Preconditions.checkNotNull(mapper, "object Mapper can not be null"); // Smile mapper is used to read query results that are serialized as binary instead of json // Smile mapper is used to read query results that are serialized as binary instead of json - ObjectMapper smileMapper1 = Preconditions.checkNotNull(smileMapper, "Smile Mapper can not be null"); + ObjectMapper smileObjectMapper = Preconditions.checkNotNull(smileMapper, "Smile Mapper can not be null"); // Create query - this.query = mapper1.readValue(Preconditions.checkNotNull(hiveDruidSplit.getDruidQuery()), Query.class); + this.query = objectMapper.readValue(Preconditions.checkNotNull(hiveDruidSplit.getDruidQuery()), Query.class); Preconditions.checkNotNull(query); /* Result type definition used to read the rows, this is query dependent. */ JavaType resultsType = getResultTypeDef(); - HttpClient httpClient1 = Preconditions.checkNotNull(httpClient, "need Http Client"); + final String[] locations = hiveDruidSplit.getLocations(); boolean initialized = false; int currentLocationIndex = 0; @@ -112,10 +110,14 @@ public void initialize(InputSplit split, LOG.debug("Retrieving data from druid location[{}] using query:[{}] ", address, query); try { Request request = DruidStorageHandlerUtils.createSmileRequest(address, query); - Future inputStreamFuture = httpClient1.go(request, new InputStreamResponseHandler()); + Future inputStreamFuture = httpClient.go(request, new InputStreamResponseHandler()); //noinspection unchecked queryResultsIterator = - new JsonParserIterator(smileMapper1, resultsType, inputStreamFuture, request.getUrl().toString(), query); + new JsonParserIterator(smileObjectMapper, + resultsType, + inputStreamFuture, + request.getUrl().toString(), + query); queryResultsIterator.init(); initialized = true; } catch (IOException | ExecutionException | InterruptedException e) { @@ -146,7 +148,6 @@ public void initialize(InputSplit split, public void initialize(InputSplit split, Configuration conf) throws IOException { initialize(split, - conf, DruidStorageHandlerUtils.JSON_MAPPER, DruidStorageHandlerUtils.SMILE_MAPPER, DruidStorageHandler.getHttpClient()); @@ -175,8 +176,7 @@ public void initialize(InputSplit split, Configuration conf) throws IOException @Override public abstract NullWritable getCurrentKey() throws IOException, InterruptedException; - @Override - public abstract DruidWritable getCurrentValue() throws IOException, InterruptedException; + @Override public abstract DruidWritable getCurrentValue() throws IOException, InterruptedException; @Override public abstract float getProgress() throws IOException; diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index 516faf0814..944128c187 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -406,8 +406,20 @@ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQ } @Override public Object deserialize(Writable writable) throws SerDeException { + return doDeserialize(writable); + } + + /** + * This method is not really meant to be a public API, it is used by + * {@link org.apache.hadoop.hive.druid.io.DruidVectorizedWrapper} to Deserialize druid Writable. + * + * @param writable + * @return Hive Row as an array list. + * @throws SerDeException + */ + public ArrayList doDeserialize(Writable writable) throws SerDeException { final DruidWritable input = (DruidWritable) writable; - final List output = Lists.newArrayListWithExpectedSize(columns.length); + final ArrayList output = Lists.newArrayListWithExpectedSize(columns.length); for (int i = 0; i < columns.length; i++) { final Object value = input.isCompacted() ? input.getCompactedValue().get(i) : input.getValue().get(columns[i]); if (value == null) { diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java index e27f8cf0e8..d519214495 100644 --- druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java +++ druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java @@ -777,9 +777,7 @@ private static Properties createPropertiesQuery(String dataSource, final HiveDruidSplit split = new HiveDruidSplit(jsonQuery, new Path("empty"), new String[] { "testing_host" }); assert reader != null; - reader.initialize(split, - new Configuration(), - DruidStorageHandlerUtils.JSON_MAPPER, + reader.initialize(split, DruidStorageHandlerUtils.JSON_MAPPER, DruidStorageHandlerUtils.SMILE_MAPPER, httpClient); StructObjectInspector oi = (StructObjectInspector) serDe.getObjectInspector(); @@ -807,9 +805,7 @@ private static Properties createPropertiesQuery(String dataSource, when(httpClient.go(anyObject(), any(HttpResponseHandler.class))).thenReturn(futureResult); reader = DruidQueryBasedInputFormat.getDruidQueryReader(queryType); assert reader != null; - reader.initialize(split, - new Configuration(), - DruidStorageHandlerUtils.JSON_MAPPER, + reader.initialize(split, DruidStorageHandlerUtils.JSON_MAPPER, DruidStorageHandlerUtils.SMILE_MAPPER, httpClient); diff --git ql/src/test/queries/clientpositive/druidkafkamini_basic.q ql/src/test/queries/clientpositive/druidkafkamini_basic.q index 73593ef0b3..1e999df00e 100644 --- ql/src/test/queries/clientpositive/druidkafkamini_basic.q +++ ql/src/test/queries/clientpositive/druidkafkamini_basic.q @@ -1,4 +1,4 @@ -SET hive.vectorized.execution.enabled=false; +SET hive.vectorized.execution.enabled=true ; CREATE EXTERNAL TABLE druid_kafka_test(`__time` timestamp, page string, `user` string, language string, added int, deleted int) STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' TBLPROPERTIES ( diff --git ql/src/test/queries/clientpositive/druidmini_dynamic_partition.q ql/src/test/queries/clientpositive/druidmini_dynamic_partition.q index 1b208625e7..36a68d0661 100644 --- ql/src/test/queries/clientpositive/druidmini_dynamic_partition.q +++ ql/src/test/queries/clientpositive/druidmini_dynamic_partition.q @@ -1,5 +1,5 @@ --! qt:dataset:alltypesorc -SET hive.vectorized.execution.enabled=false; +SET hive.vectorized.execution.enabled=true ; SET hive.ctas.external.tables=true; SET hive.external.table.purge.default = true; CREATE EXTERNAL TABLE druid_partitioned_table_0 diff --git ql/src/test/queries/clientpositive/druidmini_expressions.q ql/src/test/queries/clientpositive/druidmini_expressions.q index a39cd13cca..2ba8a669f5 100644 --- ql/src/test/queries/clientpositive/druidmini_expressions.q +++ ql/src/test/queries/clientpositive/druidmini_expressions.q @@ -1,7 +1,7 @@ --! qt:dataset:druid_table_alltypesorc SET hive.ctas.external.tables=true; -SET hive.vectorized.execution.enabled=false; +SET hive.vectorized.execution.enabled=true; -- MATH AND STRING functions @@ -56,16 +56,27 @@ EXPLAIN select count(DISTINCT cstring2) FROM druid_table_alltypesorc ; EXPLAIN select count(DISTINCT cstring2), sum(cdouble) FROM druid_table_alltypesorc ; EXPLAIN select count(distinct cstring2 || '_'|| cstring1), sum(cdouble), min(cint) FROM druid_table_alltypesorc; +-- Force the scan query to test limit push down. +EXPLAIN select count(*) from (select `__time` from druid_table_alltypesorc limit 1025) as src; + select count(DISTINCT cstring2), sum(cdouble) FROM druid_table_alltypesorc GROUP BY floor_year(`__time`) ; select count(distinct cstring2), sum(2 * cdouble) FROM druid_table_alltypesorc GROUP BY floor_year(`__time`) ; select count(DISTINCT cstring2) FROM druid_table_alltypesorc ; +select count(DISTINCT cstring1) FROM druid_table_alltypesorc ; + select count(DISTINCT cstring2), sum(cdouble) FROM druid_table_alltypesorc ; select count(distinct cstring2 || '_'|| cstring1), sum(cdouble), min(cint) FROM druid_table_alltypesorc; +-- Force a scan query to test Vectorized path +select count(*) from (select `__time` from druid_table_alltypesorc limit 1025) as src; + +-- Force a scan query to test Vectorized path +select count(*) from (select `__time` from druid_table_alltypesorc limit 200000) as src; + explain select unix_timestamp(from_unixtime(1396681200)) from druid_table_alltypesorc limit 1; select unix_timestamp(from_unixtime(1396681200)) from druid_table_alltypesorc limit 1; diff --git ql/src/test/queries/clientpositive/druidmini_extractTime.q ql/src/test/queries/clientpositive/druidmini_extractTime.q index 03afbe24f2..d951250f60 100644 --- ql/src/test/queries/clientpositive/druidmini_extractTime.q +++ ql/src/test/queries/clientpositive/druidmini_extractTime.q @@ -1,5 +1,5 @@ --! qt:dataset:druid_table_alltypesorc -SET hive.vectorized.execution.enabled=false; +SET hive.vectorized.execution.enabled=true ; SET hive.ctas.external.tables=true; SET hive.external.table.purge.default = true; diff --git ql/src/test/queries/clientpositive/druidmini_floorTime.q ql/src/test/queries/clientpositive/druidmini_floorTime.q index b0dce67b50..ffe9b7981d 100644 --- ql/src/test/queries/clientpositive/druidmini_floorTime.q +++ ql/src/test/queries/clientpositive/druidmini_floorTime.q @@ -1,5 +1,5 @@ --! qt:dataset:druid_table_alltypesorc -SET hive.vectorized.execution.enabled=false; +SET hive.vectorized.execution.enabled=true ; SET hive.ctas.external.tables=true; SET hive.external.table.purge.default = true; diff --git ql/src/test/queries/clientpositive/druidmini_joins.q ql/src/test/queries/clientpositive/druidmini_joins.q index 1f92a0d1f0..a602887599 100644 --- ql/src/test/queries/clientpositive/druidmini_joins.q +++ ql/src/test/queries/clientpositive/druidmini_joins.q @@ -1,4 +1,4 @@ -SET hive.vectorized.execution.enabled=false; + SET hive.explain.user=false; SET hive.ctas.external.tables=true; SET hive.external.table.purge.default = true; diff --git ql/src/test/queries/clientpositive/druidmini_test1.q ql/src/test/queries/clientpositive/druidmini_test1.q index a4031c6ab1..8bb0062a84 100644 --- ql/src/test/queries/clientpositive/druidmini_test1.q +++ ql/src/test/queries/clientpositive/druidmini_test1.q @@ -1,5 +1,6 @@ --! qt:dataset:druid_table_alltypesorc SET hive.ctas.external.tables=true; +SET hive.vectorized.execution.enabled=true; SET hive.external.table.purge.default = true; -- Time Series Query diff --git ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out index 14522fb687..7a17a387d2 100644 --- ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out +++ ql/src/test/results/clientpositive/druid/druidkafkamini_basic.q.out @@ -259,7 +259,7 @@ STAGE PLANS: tag: 0 value expressions: _col0 (type: string) auto parallelism: true - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: no inputs Path -> Alias: hdfs://### HDFS PATH ### [languages] @@ -336,7 +336,7 @@ STAGE PLANS: tag: 1 value expressions: user (type: string) auto parallelism: true - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: no inputs Path -> Alias: hdfs://### HDFS PATH ### [druid_kafka_test] diff --git ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out index b07ed52d06..e53079a21f 100644 --- ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out +++ ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out @@ -226,7 +226,7 @@ STAGE PLANS: tag: -1 value expressions: _col0 (type: bigint) auto parallelism: false - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: no inputs Path -> Alias: hdfs://### HDFS PATH ### [druid_table_alltypesorc] @@ -304,7 +304,7 @@ STAGE PLANS: Truncated Path -> Alias: /druid_table_alltypesorc [druid_table_alltypesorc] Reducer 2 - Execution mode: llap + Execution mode: vectorized, llap Needs Tagging: false Reduce Operator Tree: Group By Operator @@ -688,10 +688,10 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: no inputs Reducer 2 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) @@ -751,10 +751,10 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 208 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint), _col1 (type: double) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: no inputs Reducer 2 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0), sum(VALUE._col1) @@ -814,10 +814,10 @@ STAGE PLANS: sort order: Statistics: Num rows: 1 Data size: 216 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint), _col1 (type: double), _col2 (type: int) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: no inputs Reducer 2 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0), sum(VALUE._col1), min(VALUE._col2) @@ -838,6 +838,71 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: EXPLAIN select count(*) from (select `__time` from druid_table_alltypesorc limit 1025) as src +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table_alltypesorc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: EXPLAIN select count(*) from (select `__time` from druid_table_alltypesorc limit 1025) as src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table_alltypesorc +POSTHOOK: Output: hdfs://### HDFS PATH ### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: druid_table_alltypesorc + properties: + druid.fieldNames vc + druid.fieldTypes int + druid.query.json {"queryType":"scan","dataSource":"default.druid_table_alltypesorc","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"virtualColumns":[{"type":"expression","name":"vc","expression":"0","outputType":"LONG"}],"columns":["vc"],"resultFormat":"compactedList","limit":1025} + druid.query.type scan + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE + Select Operator + Statistics: Num rows: 9173 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: PARTIAL Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: PARTIAL Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Execution mode: vectorized, llap + LLAP IO: no inputs + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: PARTIAL Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: PARTIAL Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + PREHOOK: query: select count(DISTINCT cstring2), sum(cdouble) FROM druid_table_alltypesorc GROUP BY floor_year(`__time`) PREHOOK: type: QUERY PREHOOK: Input: default@druid_table_alltypesorc @@ -865,6 +930,15 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@druid_table_alltypesorc POSTHOOK: Output: hdfs://### HDFS PATH ### 6078 +PREHOOK: query: select count(DISTINCT cstring1) FROM druid_table_alltypesorc +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table_alltypesorc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(DISTINCT cstring1) FROM druid_table_alltypesorc +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table_alltypesorc +POSTHOOK: Output: hdfs://### HDFS PATH ### +3025 PREHOOK: query: select count(DISTINCT cstring2), sum(cdouble) FROM druid_table_alltypesorc PREHOOK: type: QUERY PREHOOK: Input: default@druid_table_alltypesorc @@ -883,6 +957,24 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@druid_table_alltypesorc POSTHOOK: Output: hdfs://### HDFS PATH ### 6095 2.7308662809692383E7 -1073279343 +PREHOOK: query: select count(*) from (select `__time` from druid_table_alltypesorc limit 1025) as src +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table_alltypesorc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(*) from (select `__time` from druid_table_alltypesorc limit 1025) as src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table_alltypesorc +POSTHOOK: Output: hdfs://### HDFS PATH ### +1025 +PREHOOK: query: select count(*) from (select `__time` from druid_table_alltypesorc limit 200000) as src +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table_alltypesorc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(*) from (select `__time` from druid_table_alltypesorc limit 200000) as src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table_alltypesorc +POSTHOOK: Output: hdfs://### HDFS PATH ### +6105 PREHOOK: query: explain select unix_timestamp(from_unixtime(1396681200)) from druid_table_alltypesorc limit 1 PREHOOK: type: QUERY PREHOOK: Input: default@druid_table_alltypesorc @@ -1284,10 +1376,10 @@ STAGE PLANS: sort order: ++ Statistics: Num rows: 9173 Data size: 976192 Basic stats: COMPLETE Column stats: NONE TopN Hash Memory Usage: 0.1 - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: no inputs Reducer 2 - Execution mode: llap + Execution mode: vectorized, llap Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: date), KEY.reducesinkkey1 (type: date)