diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java index c07c24d755..5db6be8f4b 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java @@ -35,11 +35,10 @@ import org.apache.hadoop.mapred.RecordReader; import java.io.IOException; -import java.util.ArrayList; import java.util.Properties; /** - * A Wrapper class that takes a row-by-row Druid Record Reader and provides a Vectorized one. + * A Wrapper class that consumes row-by-row from base Druid Record Reader and provides a Vectorized one. * @param type of the Druid query. */ public class DruidVectorizedWrapper> implements RecordReader { @@ -47,6 +46,7 @@ private final DruidQueryRecordReader baseReader; private final VectorizedRowBatchCtx rbCtx; private final DruidSerDe serDe; + private final Object[] rowBoat; /** * Actual projected columns needed by the query, this can be empty in case of query like: select count(*) from src. @@ -77,21 +77,21 @@ public DruidVectorizedWrapper(DruidQueryRecordReader reader, Configuration jobCo } druidWritable = baseReader.createValue(); + rowBoat = new Object[projectedColumns.length]; } @Override public boolean next(NullWritable nullWritable, VectorizedRowBatch vectorizedRowBatch) throws IOException { vectorizedRowBatch.reset(); - ArrayList row; int rowsCount = 0; while (rowsCount < vectorizedRowBatch.getMaxSize() && baseReader.next(nullWritable, druidWritable)) { if (projectedColumns.length > 0) { try { - row = serDe.deserializeAsPrimitive(druidWritable); + serDe.deserializeAsPrimitive(druidWritable, rowBoat); } catch (SerDeException e) { throw new IOException(e); } for (int i : projectedColumns) { - vectorAssignRow.assignRowColumn(vectorizedRowBatch, rowsCount, i, row.get(i)); + vectorAssignRow.assignRowColumn(vectorizedRowBatch, rowsCount, i, rowBoat[i]); } } rowsCount++; diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index c751932dd2..b9909f29b5 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -483,79 +483,77 @@ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQ * Function to convert Druid Primitive values to Hive Primitives. Main usage of this is to pipe data to VectorRow. * This has the exact same logic as {@link DruidSerDe#deserialize(Writable)}, any modification here should be done * there as well. - * Reason to have 2 function is performance, merging both will bring an extra test on the hot loop. + * Reason to have 2 function is to avoid extra objects allocations. * - * @param writable Druid Writable container. - * @return ArrayList of Hive Primitives. + * @param writable Druid Writable. + * @param output Rowboat used to carry columns values. + * @throws SerDeException in case of deserialization errors. */ - public ArrayList deserializeAsPrimitive(Writable writable) throws SerDeException { + public void deserializeAsPrimitive(Writable writable, final Object[] output) throws SerDeException { final DruidWritable input = (DruidWritable) writable; - final ArrayList output = Lists.newArrayListWithExpectedSize(columns.length); - for (int i = 0; i < columns.length; i++) { final Object value = input.isCompacted() ? input.getCompactedValue().get(i) : input.getValue().get(columns[i]); if (value == null) { - output.add(null); + output[i] = (null); continue; } switch (types[i].getPrimitiveCategory()) { case TIMESTAMP: - output.add(Timestamp.ofEpochMilli(deserializeToMillis(value))); + output[i] = (Timestamp.ofEpochMilli(deserializeToMillis(value))); break; case TIMESTAMPLOCALTZ: - output.add(new TimestampTZ(ZonedDateTime.ofInstant(Instant.ofEpochMilli(deserializeToMillis(value)), - ((TimestampLocalTZTypeInfo) types[i]).timeZone()))); + output[i] = + (new TimestampTZ(ZonedDateTime.ofInstant(Instant.ofEpochMilli(deserializeToMillis(value)), + ((TimestampLocalTZTypeInfo) types[i]).timeZone()))); break; case DATE: - output.add(Date.ofEpochMilli(deserializeToMillis(value))); + output[i] = (Date.ofEpochMilli(deserializeToMillis(value))); break; case BYTE: - output.add(((Number) value).byteValue()); + output[i] = (((Number) value).byteValue()); break; case SHORT: - output.add(((Number) value).shortValue()); + output[i] = (((Number) value).shortValue()); break; case INT: if (value instanceof Number) { - output.add(((Number) value).intValue()); + output[i] = (((Number) value).intValue()); } else { // This is a corner case where we have an extract of time unit like day/month pushed as Extraction Fn //@TODO The best way to fix this is to add explicit output Druid types to Calcite Extraction Functions impls - output.add(Integer.valueOf((String) value)); + output[i] = (Integer.valueOf((String) value)); } break; case LONG: - output.add(((Number) value).longValue()); + output[i] = (((Number) value).longValue()); break; case FLOAT: - output.add(((Number) value).floatValue()); + output[i] = (((Number) value).floatValue()); break; case DOUBLE: - output.add(((Number) value).doubleValue()); + output[i] = (((Number) value).doubleValue()); break; case CHAR: - output.add(new HiveChar(value.toString(), ((CharTypeInfo) types[i]).getLength())); + output[i] = (new HiveChar(value.toString(), ((CharTypeInfo) types[i]).getLength())); break; case VARCHAR: - output.add(new HiveVarchar(value.toString(), ((VarcharTypeInfo) types[i]).getLength())); + output[i] = (new HiveVarchar(value.toString(), ((VarcharTypeInfo) types[i]).getLength())); break; case STRING: - output.add(value.toString()); + output[i] = (value.toString()); break; case BOOLEAN: if (value instanceof Number) { - output.add(((Number) value).intValue() != 0); + output[i] = (((Number) value).intValue() != 0); } else { - output.add(Boolean.valueOf(value.toString())); + output[i] = (Boolean.valueOf(value.toString())); } break; default: throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory()); } } - - return output; } private static long deserializeToMillis(Object value) { diff --git ql/src/test/queries/clientpositive/druidmini_expressions.q ql/src/test/queries/clientpositive/druidmini_expressions.q index 2f177108cf..36aad7937d 100644 --- ql/src/test/queries/clientpositive/druidmini_expressions.q +++ ql/src/test/queries/clientpositive/druidmini_expressions.q @@ -200,3 +200,13 @@ explain select `timets` from (select cast(`__time` as timestamp ) as timets from select `timets_with_tz` from (select `__time` as timets_with_tz from druid_table_alltypesorc order by timets_with_tz limit 10) as src order by `timets_with_tz`; select `timets` from (select cast(`__time` as timestamp ) as timets from druid_table_alltypesorc order by timets limit 10) as src order by `timets`; + +select count(cfloat) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 1025) as src; + +select count(cstring1) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 90000) as src; + +explain select count(cstring1) from (select `cfloat`, `cstring1`, `cint` from druid_table_alltypesorc limit 90000) as src; + +select max(cint * cdouble) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src; + +explain select max(cint * cfloat) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src; \ No newline at end of file diff --git ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out index 47f663bbf6..973cade307 100644 --- ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out +++ ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out @@ -2272,3 +2272,160 @@ POSTHOOK: Output: hdfs://### HDFS PATH ### 1969-12-31 15:59:00 1969-12-31 15:59:00 1969-12-31 15:59:00 +PREHOOK: query: select count(cfloat) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 1025) as src +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table_alltypesorc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(cfloat) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 1025) as src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table_alltypesorc +POSTHOOK: Output: hdfs://### HDFS PATH ### +1025 +PREHOOK: query: select count(cstring1) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 90000) as src +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table_alltypesorc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(cstring1) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 90000) as src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table_alltypesorc +POSTHOOK: Output: hdfs://### HDFS PATH ### +3036 +PREHOOK: query: explain select count(cstring1) from (select `cfloat`, `cstring1`, `cint` from druid_table_alltypesorc limit 90000) as src +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table_alltypesorc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain select count(cstring1) from (select `cfloat`, `cstring1`, `cint` from druid_table_alltypesorc limit 90000) as src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table_alltypesorc +POSTHOOK: Output: hdfs://### HDFS PATH ### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: druid_table_alltypesorc + properties: + druid.fieldNames cstring1 + druid.fieldTypes string + druid.query.json {"queryType":"scan","dataSource":"default.druid_table_alltypesorc","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"columns":["cstring1"],"resultFormat":"compactedList","limit":90000} + druid.query.type scan + Statistics: Num rows: 9173 Data size: 1603744 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(cstring1) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Execution mode: vectorized, llap + LLAP IO: no inputs + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select max(cint * cdouble) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table_alltypesorc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select max(cint * cdouble) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table_alltypesorc +POSTHOOK: Output: hdfs://### HDFS PATH ### +8.256991051261554E15 +PREHOOK: query: explain select max(cint * cfloat) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table_alltypesorc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain select max(cint * cfloat) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table_alltypesorc +POSTHOOK: Output: hdfs://### HDFS PATH ### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: druid_table_alltypesorc + properties: + druid.fieldNames cfloat,cint + druid.fieldTypes float,int + druid.query.json {"queryType":"scan","dataSource":"default.druid_table_alltypesorc","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"columns":["cfloat","cint"],"resultFormat":"compactedList","limit":90000} + druid.query.type scan + Statistics: Num rows: 9173 Data size: 69728 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: (UDFToFloat(cint) * cfloat) (type: float) + outputColumnNames: _col0 + Statistics: Num rows: 9173 Data size: 69728 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: max(_col0) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: float) + Execution mode: vectorized, llap + LLAP IO: no inputs + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: max(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink +