diff --git druid-handler/pom.xml druid-handler/pom.xml index 5b8a601131..5642ea818c 100644 --- druid-handler/pom.xml +++ druid-handler/pom.xml @@ -158,13 +158,13 @@ io.druid.extensions mysql-metadata-storage ${druid.version} - + mysql mysql-connector-java - + io.druid.extensions postgresql-metadata-storage @@ -346,7 +346,7 @@ META-INF/*.SF META-INF/*.DSA META-INF/*.RSA - static/ + static/ diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java index 797d597500..0705804066 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/conf/DruidConstants.java @@ -69,7 +69,7 @@ private DruidConstants() { // value delimiter for druid columns public static final String DRUID_PARSE_SPEC_DELIMITER = "druid.parseSpec.delimiter"; - // list demiliter for multi-valued columns + // list delimiter for multi-valued columns public static final String DRUID_PARSE_SPEC_LIST_DELIMITER = "druid.parseSpec.listDelimiter"; // order of columns for delimiter and csv parse specs. diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java index c07c24d755..5db6be8f4b 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidVectorizedWrapper.java @@ -35,11 +35,10 @@ import org.apache.hadoop.mapred.RecordReader; import java.io.IOException; -import java.util.ArrayList; import java.util.Properties; /** - * A Wrapper class that takes a row-by-row Druid Record Reader and provides a Vectorized one. + * A Wrapper class that consumes row-by-row from base Druid Record Reader and provides a Vectorized one. * @param type of the Druid query. */ public class DruidVectorizedWrapper> implements RecordReader { @@ -47,6 +46,7 @@ private final DruidQueryRecordReader baseReader; private final VectorizedRowBatchCtx rbCtx; private final DruidSerDe serDe; + private final Object[] rowBoat; /** * Actual projected columns needed by the query, this can be empty in case of query like: select count(*) from src. @@ -77,21 +77,21 @@ public DruidVectorizedWrapper(DruidQueryRecordReader reader, Configuration jobCo } druidWritable = baseReader.createValue(); + rowBoat = new Object[projectedColumns.length]; } @Override public boolean next(NullWritable nullWritable, VectorizedRowBatch vectorizedRowBatch) throws IOException { vectorizedRowBatch.reset(); - ArrayList row; int rowsCount = 0; while (rowsCount < vectorizedRowBatch.getMaxSize() && baseReader.next(nullWritable, druidWritable)) { if (projectedColumns.length > 0) { try { - row = serDe.deserializeAsPrimitive(druidWritable); + serDe.deserializeAsPrimitive(druidWritable, rowBoat); } catch (SerDeException e) { throw new IOException(e); } for (int i : projectedColumns) { - vectorAssignRow.assignRowColumn(vectorizedRowBatch, rowsCount, i, row.get(i)); + vectorAssignRow.assignRowColumn(vectorizedRowBatch, rowsCount, i, rowBoat[i]); } } rowsCount++; diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java index 00442f35af..7ea23913d2 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorTuningConfig.java @@ -33,7 +33,7 @@ * This class is copied from druid source code * in order to avoid adding additional dependencies on druid-indexing-service. */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({ +@SuppressWarnings("ALL") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({ @JsonSubTypes.Type(name = "kafka", value = KafkaSupervisorTuningConfig.class) }) public class KafkaSupervisorTuningConfig extends KafkaTuningConfig { private final Integer workerThreads; diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java index 2faa1623b6..9a19da264f 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaTuningConfig.java @@ -109,7 +109,7 @@ /** * Always returns true, doesn't affect the version being built. */ - @Deprecated @JsonProperty public boolean getBuildV9Directly() { + @SuppressWarnings("SameReturnValue") @Deprecated @JsonProperty public boolean getBuildV9Directly() { return true; } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java index 9e09ddb45a..f35db6d01c 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java @@ -31,11 +31,11 @@ * This class is copied from druid source code * in order to avoid adding additional dependencies on druid-indexing-service. */ -public class TaskReportData { +@SuppressWarnings("unused") public class TaskReportData { /** * Task type used by serializer but does not have any functionality as far i can tell. */ - public enum TaskType { + @SuppressWarnings("unused") public enum TaskType { ACTIVE, PUBLISHING, UNKNOWN } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java index db2721d6f8..4d9d1a9680 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java @@ -50,6 +50,7 @@ } if (queryResultsIterator.hasNext()) { ScanResultValue current = queryResultsIterator.next(); + //noinspection unchecked compactedValues = ((List>) current.getEvents()).iterator(); return nextKeyValue(); } diff --git druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index c751932dd2..696a2469c4 100644 --- druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -42,14 +42,6 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeSpec; import org.apache.hadoop.hive.serde2.SerDeStats; -import org.apache.hadoop.hive.serde2.io.ByteWritable; -import org.apache.hadoop.hive.serde2.io.DateWritableV2; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.io.HiveCharWritable; -import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; -import org.apache.hadoop.hive.serde2.io.ShortWritable; -import org.apache.hadoop.hive.serde2.io.TimestampLocalTZWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritableV2; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructField; @@ -73,11 +65,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; @@ -209,7 +196,7 @@ private void initFromProperties(final Properties properties) throws SerDeExcepti final List inspectors = columnTypes.stream() - .map(PrimitiveObjectInspectorFactory::getPrimitiveWritableObjectInspector) + .map(PrimitiveObjectInspectorFactory::getPrimitiveJavaObjectInspector) .collect(Collectors.toList()); columns = columnNames.toArray(new String[0]); types = columnTypes.toArray(new PrimitiveTypeInfo[0]); @@ -254,7 +241,7 @@ private void initFromDruidQueryPlan(Properties properties, String druidQuery) { for (int i = 0; i < columnTypes.size(); ++i) { columns[i] = columnNames.get(i); types[i] = columnTypes.get(i); - inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(types[i])); + inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(types[i])); } inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors); } @@ -417,63 +404,8 @@ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQ final Object value = input.isCompacted() ? input.getCompactedValue().get(i) : input.getValue().get(columns[i]); if (value == null) { output.add(null); - continue; - } - switch (types[i].getPrimitiveCategory()) { - case TIMESTAMP: - output.add(new TimestampWritableV2(Timestamp.ofEpochMilli(deserializeToMillis(value)))); - break; - case TIMESTAMPLOCALTZ: - output.add(new TimestampLocalTZWritable(new TimestampTZ(ZonedDateTime.ofInstant(Instant.ofEpochMilli( - deserializeToMillis(value)), ((TimestampLocalTZTypeInfo) types[i]).timeZone())))); - break; - case DATE: - output.add(new DateWritableV2(Date.ofEpochMilli(deserializeToMillis(value)))); - break; - case BYTE: - output.add(new ByteWritable(((Number) value).byteValue())); - break; - case SHORT: - output.add(new ShortWritable(((Number) value).shortValue())); - break; - case INT: - if (value instanceof Number) { - output.add(new IntWritable(((Number) value).intValue())); - } else { - // This is a corner case where we have an extract of time unit like day/month pushed as Extraction Fn - //@TODO The best way to fix this is to add explicit output Druid types to Calcite Extraction Functions impls - output.add(new IntWritable(Integer.valueOf((String) value))); - } - - break; - case LONG: - output.add(new LongWritable(((Number) value).longValue())); - break; - case FLOAT: - output.add(new FloatWritable(((Number) value).floatValue())); - break; - case DOUBLE: - output.add(new DoubleWritable(((Number) value).doubleValue())); - break; - case CHAR: - output.add(new HiveCharWritable(new HiveChar(value.toString(), ((CharTypeInfo) types[i]).getLength()))); - break; - case VARCHAR: - output.add(new HiveVarcharWritable(new HiveVarchar(value.toString(), - ((VarcharTypeInfo) types[i]).getLength()))); - break; - case STRING: - output.add(new Text(value.toString())); - break; - case BOOLEAN: - if (value instanceof Number) { - output.add(new BooleanWritable(((Number) value).intValue() != 0)); - } else { - output.add(new BooleanWritable(Boolean.valueOf(value.toString()))); - } - break; - default: - throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory()); + } else { + output.add(convertAsPrimitive(value, types[i])); } } return output; @@ -483,79 +415,65 @@ protected SegmentAnalysis submitMetadataRequest(String address, SegmentMetadataQ * Function to convert Druid Primitive values to Hive Primitives. Main usage of this is to pipe data to VectorRow. * This has the exact same logic as {@link DruidSerDe#deserialize(Writable)}, any modification here should be done * there as well. - * Reason to have 2 function is performance, merging both will bring an extra test on the hot loop. + * Reason to have 2 function is that no vectorized path expects writables. * - * @param writable Druid Writable container. - * @return ArrayList of Hive Primitives. + * @param writable Druid Writable. + * @param rowBoat Rowboat used to carry columns values. + * @throws SerDeException in case of deserialization errors. */ - public ArrayList deserializeAsPrimitive(Writable writable) throws SerDeException { + public void deserializeAsPrimitive(Writable writable, final Object[] rowBoat) throws SerDeException { final DruidWritable input = (DruidWritable) writable; - final ArrayList output = Lists.newArrayListWithExpectedSize(columns.length); - for (int i = 0; i < columns.length; i++) { final Object value = input.isCompacted() ? input.getCompactedValue().get(i) : input.getValue().get(columns[i]); - if (value == null) { - output.add(null); - continue; + rowBoat[i] = null; + if (value != null) { + rowBoat[i] = convertAsPrimitive(value, types[i]); } - switch (types[i].getPrimitiveCategory()) { - case TIMESTAMP: - output.add(Timestamp.ofEpochMilli(deserializeToMillis(value))); - break; - case TIMESTAMPLOCALTZ: - output.add(new TimestampTZ(ZonedDateTime.ofInstant(Instant.ofEpochMilli(deserializeToMillis(value)), - ((TimestampLocalTZTypeInfo) types[i]).timeZone()))); - break; - case DATE: - output.add(Date.ofEpochMilli(deserializeToMillis(value))); - break; - case BYTE: - output.add(((Number) value).byteValue()); - break; - case SHORT: - output.add(((Number) value).shortValue()); - break; - case INT: - if (value instanceof Number) { - output.add(((Number) value).intValue()); - } else { - // This is a corner case where we have an extract of time unit like day/month pushed as Extraction Fn - //@TODO The best way to fix this is to add explicit output Druid types to Calcite Extraction Functions impls - output.add(Integer.valueOf((String) value)); - } + } + } - break; - case LONG: - output.add(((Number) value).longValue()); - break; - case FLOAT: - output.add(((Number) value).floatValue()); - break; - case DOUBLE: - output.add(((Number) value).doubleValue()); - break; - case CHAR: - output.add(new HiveChar(value.toString(), ((CharTypeInfo) types[i]).getLength())); - break; - case VARCHAR: - output.add(new HiveVarchar(value.toString(), ((VarcharTypeInfo) types[i]).getLength())); - break; - case STRING: - output.add(value.toString()); - break; - case BOOLEAN: - if (value instanceof Number) { - output.add(((Number) value).intValue() != 0); - } else { - output.add(Boolean.valueOf(value.toString())); - } - break; - default: - throw new SerDeException("Unknown type: " + types[i].getPrimitiveCategory()); + private static Object convertAsPrimitive(Object value, PrimitiveTypeInfo typeInfo) throws SerDeException { + switch (typeInfo.getPrimitiveCategory()) { + case TIMESTAMP: + return Timestamp.ofEpochMilli(deserializeToMillis(value)); + case TIMESTAMPLOCALTZ: + return new TimestampTZ(ZonedDateTime.ofInstant(Instant.ofEpochMilli(deserializeToMillis(value)), + ((TimestampLocalTZTypeInfo) typeInfo).timeZone())); + case DATE: + return (Date.ofEpochMilli(deserializeToMillis(value))); + case BYTE: + return ((Number) value).byteValue(); + case SHORT: + return ((Number) value).shortValue(); + case INT: + if (value instanceof Number) { + return ((Number) value).intValue(); + } else { + // This is a corner case where we have an extract of time unit like day/month pushed as Extraction Fn + //@TODO The best way to fix this is to add explicit output Druid types to Calcite Extraction Functions impls + return (Integer.valueOf((String) value)); } + case LONG: + return ((Number) value).longValue(); + case FLOAT: + return ((Number) value).floatValue(); + case DOUBLE: + return ((Number) value).doubleValue(); + case CHAR: + return new HiveChar(value.toString(), ((CharTypeInfo) typeInfo).getLength()); + case VARCHAR: + return new HiveVarchar(value.toString(), ((VarcharTypeInfo) typeInfo).getLength()); + case STRING: + return value.toString(); + case BOOLEAN: + if (value instanceof Number) { + return (((Number) value).intValue() != 0); + } else { + return (Boolean.valueOf(value.toString())); + } + default: + throw new SerDeException("Unknown type: " + typeInfo.getPrimitiveCategory()); } - - return output; } private static long deserializeToMillis(Object value) { diff --git druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java index 74bceec169..656dc25e6f 100644 --- druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java +++ druid-handler/src/test/org/apache/hadoop/hive/druid/serde/TestDruidSerDe.java @@ -67,7 +67,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; @@ -154,15 +153,15 @@ // Timeseries query results as records private static final Object[][] TIMESERIES_QUERY_RESULTS_RECORDS = new Object[][]{ new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), - new LongWritable(0), - new FloatWritable(1.0F), - new FloatWritable(2.2222F) }, + new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC)), + 0L, + 1.0F, + 2.2222F }, new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325462400000L).atZone(ZoneOffset.UTC))), - new LongWritable(2), - new FloatWritable(3.32F), - new FloatWritable(4F) } }; + new TimestampTZ(Instant.ofEpochMilli(1325462400000L).atZone(ZoneOffset.UTC)), + 2L, + 3.32F, + 4F } }; // Timeseries query results as records (types defined by metastore) private static final String TIMESERIES_COLUMN_NAMES = "timestamp,sample_name1,sample_name2,sample_divide"; @@ -269,35 +268,20 @@ // TopN query results as records private static final Object[][] TOPN_QUERY_RESULTS_RECORDS = new Object[][]{ new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))), - new Text("dim1_val"), - new LongWritable(111), - new FloatWritable(10669F), - new FloatWritable(96.11711711711712F) }, + new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC)), + "dim1_val", 111L, 10669F, 96.11711711711712F }, new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))), - new Text("another_dim1_val"), - new LongWritable(88), - new FloatWritable(28344F), - new FloatWritable(322.09090909090907F) }, + (new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))), + "another_dim1_val", 88L, 28344F, 322.09090909090907F }, new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))), - new Text("dim1_val3"), - new LongWritable(70), - new FloatWritable(871F), - new FloatWritable(12.442857142857143F) }, + (new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))), + "dim1_val3", 70L, 871F, 12.442857142857143F }, new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))), - new Text("dim1_val4"), - new LongWritable(62), - new FloatWritable(815F), - new FloatWritable(13.14516129032258F) }, + new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC)), + "dim1_val4", 62L, 815F, 13.14516129032258F }, new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))), - new Text("dim1_val5"), - new LongWritable(60), - new FloatWritable(2787F), - new FloatWritable(46.45F) } }; + (new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))), + "dim1_val5", 60L, 2787F, 46.45F } }; // TopN query results as records (types defined by metastore) private static final String TOPN_COLUMN_NAMES = "timestamp,sample_dim,count,some_metric,sample_divide"; @@ -437,39 +421,25 @@ // GroupBy query results as records private static final Object[][] GROUP_BY_QUERY_EXTRACTION_RESULTS_RECORDS = new Object[][]{ new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), - new LongWritable(200) }, + (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), + (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), 200L }, new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), - new LongWritable(400) } }; + (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), + (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), 400L } }; private static final Object[][] GROUP_BY_QUERY_RESULTS_RECORDS = new Object[][]{ new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), - new Text("India"), - new Text("phone"), - new LongWritable(88), - new DoubleWritable(29.91233453), - new FloatWritable(60.32F) }, + (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), "India", + "phone", 88L, 29.91233453, 60.32F }, new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), - new Text("Spain"), - new Text("pc"), - new LongWritable(16), - new DoubleWritable(172.93494959), - new FloatWritable(6.333333F) } }; + (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), + "Spain", "pc", 16L, 172.93494959, 6.333333F } }; private static final Object[][] GB_MONTH_EXTRACTION_RESULTS_RECORDS = new Object[][]{ new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), - new IntWritable(1), - new LongWritable(200) }, + (new TimestampTZ(Instant.ofEpochMilli(1325376000000L).atZone(ZoneOffset.UTC))), 1, 200L }, new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), - new IntWritable(1), - new LongWritable(400) } }; + (new TimestampTZ(Instant.ofEpochMilli(1325376012000L).atZone(ZoneOffset.UTC))), 1, 400L } }; // GroupBy query results as records (types defined by metastore) private static final String GROUP_BY_COLUMN_NAMES = "timestamp,country,device,total_usage,data_transfer,avg_usage"; @@ -617,80 +587,50 @@ + "float,float"; private static final Object[][] SELECT_QUERY_RESULTS_RECORDS = new Object[][]{ new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))), - new BooleanWritable(true), - new Text("article"), - new Text("0"), - new Text("0"), - new Text("11._korpus_(NOVJ)"), - new Text("sl"), - new Text("0"), - new Text("EmausBot"), - new DoubleWritable(1.0d), - new DoubleWritable(39.0d), - new FloatWritable(39.0F), - new FloatWritable(39.0F), - new FloatWritable(0.0F) }, + (new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))), Boolean.TRUE, + "article", + "0", + "0", + "11._korpus_(NOVJ)", + "sl", + "0", + "EmausBot", 1.0d, 39.0d, 39.0F, 39.0F, 0.0F }, new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))), - new BooleanWritable(false), - new Text("article"), - new Text("0"), - new Text("0"), - new Text("112_U.S._580"), - new Text("en"), - new Text("1"), - new Text("MZMcBride"), - new DoubleWritable(1.0d), - new DoubleWritable(70.0d), - new FloatWritable(70.0F), - new FloatWritable(70.0F), - new FloatWritable(0.0F) }, + (new TimestampTZ(Instant.ofEpochMilli(1356998400000L).atZone(ZoneOffset.UTC))), Boolean.FALSE, + "article", + "0", + "0", + "112_U.S._580", + "en", + "1", + "MZMcBride", 1.0d, 70.0d, 70.0F, 70.0F, 0.0F }, new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))), - new BooleanWritable(false), - new Text("article"), - new Text("0"), - new Text("0"), - new Text("113_U.S._243"), - new Text("en"), - new Text("1"), - new Text("MZMcBride"), - new DoubleWritable(1.0d), - new DoubleWritable(77.0d), - new FloatWritable(77.0F), - new FloatWritable(77.0F), - new FloatWritable(0.0F) }, + (new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))), Boolean.FALSE, + "article", + "0", + "0", + "113_U.S._243", + "en", + "1", + "MZMcBride", 1.0d, 77.0d, 77.0F, 77.0F, 0.0F }, new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))), - new BooleanWritable(false), - new Text("article"), - new Text("0"), - new Text("0"), - new Text("113_U.S._73"), - new Text("en"), - new Text("1"), - new Text("MZMcBride"), - new DoubleWritable(1.0d), - new DoubleWritable(70.0d), - new FloatWritable(70.0F), - new FloatWritable(70.0F), - new FloatWritable(0.0F) }, + (new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))), Boolean.FALSE, + "article", + "0", + "0", + "113_U.S._73", + "en", + "1", + "MZMcBride", 1.0d, 70.0d, 70.0F, 70.0F, 0.0F }, new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))), - new BooleanWritable(false), - new Text("article"), - new Text("0"), - new Text("0"), - new Text("113_U.S._756"), - new Text("en"), - new Text("1"), - new Text("MZMcBride"), - new DoubleWritable(1.0d), - new DoubleWritable(68.0d), - new FloatWritable(68.0F), - new FloatWritable(68.0F), - new FloatWritable(0.0F) } }; + (new TimestampTZ(Instant.ofEpochMilli(1356998412000L).atZone(ZoneOffset.UTC))), Boolean.FALSE, + "article", + "0", + "0", + "113_U.S._756", + "en", + "1", + "MZMcBride", 1.0d, 68.0d, 68.0F, 68.0F, 0.0F } }; // Scan query private static final String @@ -869,7 +809,6 @@ private static Properties createPropertiesQuery(String dataSource, final HiveDruidSplit split = new HiveDruidSplit(jsonQuery, new Path("empty"), new String[]{"testing_host"}); - assert reader != null; reader.initialize(split, DruidStorageHandlerUtils.JSON_MAPPER, DruidStorageHandlerUtils.SMILE_MAPPER, httpClient); StructObjectInspector oi = (StructObjectInspector) serDe.getObjectInspector(); List fieldRefs = oi.getAllStructFieldRefs(); @@ -895,7 +834,6 @@ private static Properties createPropertiesQuery(String dataSource, futureResult.set(new ByteArrayInputStream(resultString)); when(httpClient.go(anyObject(), any(HttpResponseHandler.class))).thenReturn(futureResult); reader = DruidQueryBasedInputFormat.getDruidQueryReader(queryType); - assert reader != null; reader.initialize(split, DruidStorageHandlerUtils.JSON_MAPPER, DruidStorageHandlerUtils.SMILE_MAPPER, httpClient); pos = 0; @@ -1028,16 +966,15 @@ private static void serializeObject(Properties properties, } private static final Object[] ROW_OBJECT_2 = new Object[]{ - new TimestampLocalTZWritable(new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC))), - new Text("dim1_val"), - new HiveCharWritable(new HiveChar("dim2_v", 6)), - new HiveVarcharWritable(new HiveVarchar("dim3_val", 8)), - new DoubleWritable(10669.3D), - new FloatWritable(10669.45F), - new LongWritable(1113939), - new IntWritable(1112123), - new ShortWritable((short) 12), - new ByteWritable((byte) 0) }; + new TimestampTZ(Instant.ofEpochMilli(1377907200000L).atZone(ZoneOffset.UTC)), "dim1_val", + new HiveChar("dim2_v", 6), + new HiveVarchar("dim3_val", 8), + 10669.3D, + 10669.45F, + 1113939L, + 1112123, + ((short) 12), + ((byte) 0)}; private static final DruidWritable DRUID_WRITABLE_2 = new DruidWritable(ImmutableMap.builder().put("__time", 1377907200000L) diff --git druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java index ed6e430c8a..0df150dbd0 100644 --- druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java +++ druid-handler/src/test/org/apache/hadoop/hive/ql/io/TestDruidRecordWriter.java @@ -173,11 +173,11 @@ }, objectMapper); Path - segmentDescriptroPath = + segmentDescriptorPath = new Path(workingDir.getAbsolutePath(), DruidStorageHandler.SEGMENTS_DESCRIPTOR_DIR_NAME); DruidRecordWriter druidRecordWriter = - new DruidRecordWriter(dataSchema, tuningConfig, dataSegmentPusher, 20, segmentDescriptroPath, localFileSystem); + new DruidRecordWriter(dataSchema, tuningConfig, dataSegmentPusher, 20, segmentDescriptorPath, localFileSystem); List druidWritables = @@ -194,7 +194,7 @@ druidRecordWriter.write(druidWritable); } druidRecordWriter.close(false); - List dataSegmentList = DruidStorageHandlerUtils.getCreatedSegments(segmentDescriptroPath, config); + List dataSegmentList = DruidStorageHandlerUtils.getCreatedSegments(segmentDescriptorPath, config); Assert.assertEquals(1, dataSegmentList.size()); File tmpUnzippedSegmentDir = temporaryFolder.newFolder(); new LocalDataSegmentPuller().getSegmentFiles(dataSegmentList.get(0), tmpUnzippedSegmentDir); diff --git ql/src/test/queries/clientpositive/druidmini_expressions.q ql/src/test/queries/clientpositive/druidmini_expressions.q index 2f177108cf..36aad7937d 100644 --- ql/src/test/queries/clientpositive/druidmini_expressions.q +++ ql/src/test/queries/clientpositive/druidmini_expressions.q @@ -200,3 +200,13 @@ explain select `timets` from (select cast(`__time` as timestamp ) as timets from select `timets_with_tz` from (select `__time` as timets_with_tz from druid_table_alltypesorc order by timets_with_tz limit 10) as src order by `timets_with_tz`; select `timets` from (select cast(`__time` as timestamp ) as timets from druid_table_alltypesorc order by timets limit 10) as src order by `timets`; + +select count(cfloat) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 1025) as src; + +select count(cstring1) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 90000) as src; + +explain select count(cstring1) from (select `cfloat`, `cstring1`, `cint` from druid_table_alltypesorc limit 90000) as src; + +select max(cint * cdouble) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src; + +explain select max(cint * cfloat) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src; \ No newline at end of file diff --git ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out index 47f663bbf6..973cade307 100644 --- ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out +++ ql/src/test/results/clientpositive/druid/druidmini_expressions.q.out @@ -2272,3 +2272,160 @@ POSTHOOK: Output: hdfs://### HDFS PATH ### 1969-12-31 15:59:00 1969-12-31 15:59:00 1969-12-31 15:59:00 +PREHOOK: query: select count(cfloat) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 1025) as src +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table_alltypesorc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(cfloat) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 1025) as src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table_alltypesorc +POSTHOOK: Output: hdfs://### HDFS PATH ### +1025 +PREHOOK: query: select count(cstring1) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 90000) as src +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table_alltypesorc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(cstring1) from (select `cfloat`, `cstring1` from druid_table_alltypesorc limit 90000) as src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table_alltypesorc +POSTHOOK: Output: hdfs://### HDFS PATH ### +3036 +PREHOOK: query: explain select count(cstring1) from (select `cfloat`, `cstring1`, `cint` from druid_table_alltypesorc limit 90000) as src +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table_alltypesorc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain select count(cstring1) from (select `cfloat`, `cstring1`, `cint` from druid_table_alltypesorc limit 90000) as src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table_alltypesorc +POSTHOOK: Output: hdfs://### HDFS PATH ### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: druid_table_alltypesorc + properties: + druid.fieldNames cstring1 + druid.fieldTypes string + druid.query.json {"queryType":"scan","dataSource":"default.druid_table_alltypesorc","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"columns":["cstring1"],"resultFormat":"compactedList","limit":90000} + druid.query.type scan + Statistics: Num rows: 9173 Data size: 1603744 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count(cstring1) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Execution mode: vectorized, llap + LLAP IO: no inputs + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 192 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select max(cint * cdouble) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table_alltypesorc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select max(cint * cdouble) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table_alltypesorc +POSTHOOK: Output: hdfs://### HDFS PATH ### +8.256991051261554E15 +PREHOOK: query: explain select max(cint * cfloat) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src +PREHOOK: type: QUERY +PREHOOK: Input: default@druid_table_alltypesorc +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain select max(cint * cfloat) from (select `cfloat`, `cstring1`, `cint`, `cdouble` from druid_table_alltypesorc limit 90000) as src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@druid_table_alltypesorc +POSTHOOK: Output: hdfs://### HDFS PATH ### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: druid_table_alltypesorc + properties: + druid.fieldNames cfloat,cint + druid.fieldTypes float,int + druid.query.json {"queryType":"scan","dataSource":"default.druid_table_alltypesorc","intervals":["1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z"],"columns":["cfloat","cint"],"resultFormat":"compactedList","limit":90000} + druid.query.type scan + Statistics: Num rows: 9173 Data size: 69728 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: (UDFToFloat(cint) * cfloat) (type: float) + outputColumnNames: _col0 + Statistics: Num rows: 9173 Data size: 69728 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: max(_col0) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: float) + Execution mode: vectorized, llap + LLAP IO: no inputs + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: max(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink +